すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Node.js 用 SDK を使用してメッセージを送受信する

最終更新日:Mar 19, 2025

このトピックでは、Node.js 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。

環境要件

  • GNU Compiler Collection(GCC)がインストールされていること。詳細については、Installing GCC をご参照ください。

  • Node.js がインストールされていること。詳細については、Downloads をご参照ください。

    重要

    Node.js のバージョンは 4.0.0 以降である必要があります。

  • OpenSSL がインストールされていること。詳細については、Downloads をご参照ください。

C++ ライブラリのインストール

  1. 次のコマンドを実行して、/etc/yum.repos.d/ yum リポジトリディレクトリに切り替えます。

    cd /etc/yum.repos.d/
  2. confluent.repo という名前の yum リポジトリ設定ファイルを作成します。

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. 次のコマンドを実行して、C++ ライブラリをインストールします。

    yum install librdkafka-devel

Node.js ライブラリのインストール

  1. 次のコマンドを実行して、プリプロセッサの OpenSSL ヘッダーファイルのパスを指定します。

    # このパラメーターを、オンプレミス マシンにインストールされている OpenSSL ヘッダーファイルのパスに設定します。
    export CPPFLAGS=-I</usr/local/opt/openssl/include>
  2. 次のコマンドを実行して、コネクタの OpenSSL ライブラリファイルのパスを指定します。

    # このパラメーターを、オンプレミス マシンにインストールされている OpenSSL ライブラリファイルのパスに設定します。
    export LDFLAGS=-L</usr/local/opt/openssl/lib>
  3. 次のコマンドを実行して、Node.js ライブラリをインストールします。

    npm install i --unsafe-perm node-rdkafka
説明

whereis openssl コマンドを実行して、OpenSSL ヘッダーファイルのパスと OpenSSL ライブラリファイルのパスを取得します。

設定ファイルの作成

  1. (オプション) Secure Sockets Layer(SSL)ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。

  2. aliware-kafka-demos ページに移動し、download をクリックしてデモ プロジェクトをオンプレミス マシンにダウンロードし、デモ プロジェクトのパッケージを解凍します。

  3. 解凍したパッケージで、kafka-nodejs-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の setting.js ファイルを設定します。

    module.exports = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'bootstrap_servers': ["XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }

    パラメーター

    説明

    sasl_plain_username

    Simple Authentication and Security Layer(SASL)ユーザーのユーザー名。デフォルト エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、SASL ユーザーへの権限の付与をご参照ください。

    sasl_plain_password

    SASL ユーザーのパスワード。デフォルト エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。

    bootstrap_servers

    ApsaraMQ for Kafka インスタンスの SSL エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションでエンドポイントを取得できます。

    topic_name

    トピック名。トピック管理ApsaraMQ for Kafka コンソール の [トピック] ページでトピック名を取得できます。

    consumer_id

    グループ ID。Group の管理ApsaraMQ for Kafka コンソール の ページでグループ ID を取得できます。

  4. 必要なパラメーターが構成された後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の Node.js 依存関係ライブラリのインストールディレクトリにアップロードします。SSLエンドポイントに対応するフォルダーには、SSLルート証明書ファイルが含まれています。

メッセージの送信

次のコマンドを実行して producer.js を実行し、メッセージを送信します。

node producer.js

次のサンプルコードは、consumer.js の例を示しています。

  • デフォルトのエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */ // デバッグをすべて有効にする
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok") // 接続成功
    });
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],   
          null,      
          new Buffer('Hello Ali Kafka'),      
          null,   
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message'); // メッセージ送信時に問題が発生しました
        console.error(err);
      }
    
    }
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok"); // デリバリーレポート:プロデューサーOK
    });
    
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");
  • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */ // デバッグをすべて有効にする
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true,
        'security.protocol' : 'sasl_ssl',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'ssl.endpoint.identification.algorithm':'none',
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password']
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok") // 接続成功
    
      });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],
          new Buffer('Hello Ali Kafka'),
          null,
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message'); // メッセージ送信時に問題が発生しました
        console.error(err);
      }
    }
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok"); // デリバリーレポート:プロデューサーOK
    });
    // Any errors we encounter, including connection errors // 接続エラーを含む、発生したすべてのエラー
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");

メッセージの送信

次のコマンドを実行して producer.js を実行し、メッセージを送信します。

node consumer.js

次のサンプルコードは、consumer.js の例を示しています。

  • デフォルトのエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/ // 全てをデバッグ
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });
  • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/ // 全てをデバッグ
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'security.protocol' : 'sasl_ssl',
        'ssl.endpoint.identification.algorithm':'none',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'message.max.bytes': 32000,
        'fetch.max.bytes' : 32000,
        'fetch.message.max.bytes': 32000,
        'max.partition.fetch.bytes': 32000,
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });