このトピックでは、Node.js 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。
環境要件
GNU Compiler Collection(GCC)がインストールされていること。詳細については、Installing GCC をご参照ください。
Node.js がインストールされていること。詳細については、Downloads をご参照ください。
重要Node.js のバージョンは 4.0.0 以降である必要があります。
OpenSSL がインストールされていること。詳細については、Downloads をご参照ください。
C++ ライブラリのインストール
次のコマンドを実行して、/etc/yum.repos.d/ yum リポジトリディレクトリに切り替えます。
cd /etc/yum.repos.d/confluent.repo という名前の yum リポジトリ設定ファイルを作成します。
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1次のコマンドを実行して、C++ ライブラリをインストールします。
yum install librdkafka-devel
Node.js ライブラリのインストール
次のコマンドを実行して、プリプロセッサの OpenSSL ヘッダーファイルのパスを指定します。
# このパラメーターを、オンプレミス マシンにインストールされている OpenSSL ヘッダーファイルのパスに設定します。 export CPPFLAGS=-I</usr/local/opt/openssl/include>次のコマンドを実行して、コネクタの OpenSSL ライブラリファイルのパスを指定します。
# このパラメーターを、オンプレミス マシンにインストールされている OpenSSL ライブラリファイルのパスに設定します。 export LDFLAGS=-L</usr/local/opt/openssl/lib>次のコマンドを実行して、Node.js ライブラリをインストールします。
npm install i --unsafe-perm node-rdkafka
whereis openssl コマンドを実行して、OpenSSL ヘッダーファイルのパスと OpenSSL ライブラリファイルのパスを取得します。
設定ファイルの作成
(オプション) Secure Sockets Layer(SSL)ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。
aliware-kafka-demos ページに移動し、
をクリックしてデモ プロジェクトをオンプレミス マシンにダウンロードし、デモ プロジェクトのパッケージを解凍します。解凍したパッケージで、kafka-nodejs-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の setting.js ファイルを設定します。
module.exports = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'bootstrap_servers': ["XXX"], 'topic_name': 'XXX', 'consumer_id': 'XXX' }パラメーター
説明
sasl_plain_username
Simple Authentication and Security Layer(SASL)ユーザーのユーザー名。デフォルト エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、SASL ユーザーへの権限の付与をご参照ください。
sasl_plain_password
SASL ユーザーのパスワード。デフォルト エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。
bootstrap_servers
ApsaraMQ for Kafka インスタンスの SSL エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションでエンドポイントを取得できます。
topic_name
トピック名。トピック管理ApsaraMQ for Kafka コンソール の [トピック] ページでトピック名を取得できます。
consumer_id
グループ ID。Group の管理ApsaraMQ for Kafka コンソール の ページでグループ ID を取得できます。
必要なパラメーターが構成された後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の Node.js 依存関係ライブラリのインストールディレクトリにアップロードします。SSLエンドポイントに対応するフォルダーには、SSLルート証明書ファイルが含まれています。
メッセージの送信
次のコマンドを実行して producer.js を実行し、メッセージを送信します。
node producer.js次のサンプルコードは、consumer.js の例を示しています。
デフォルトのエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ // デバッグをすべて有効にする 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") // 接続成功 }); producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); function produce() { try { producer.produce( config['topic_name'], null, new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); // メッセージ送信時に問題が発生しました console.error(err); } } producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); // デリバリーレポート:プロデューサーOK }); producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ // デバッグをすべて有効にする 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true, 'security.protocol' : 'sasl_ssl', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'] }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") // 接続成功 }); function produce() { try { producer.produce( config['topic_name'], new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); // メッセージ送信時に問題が発生しました console.error(err); } } producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); // デリバリーレポート:プロデューサーOK }); // Any errors we encounter, including connection errors // 接続エラーを含む、発生したすべてのエラー producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
メッセージの送信
次のコマンドを実行して producer.js を実行し、メッセージを送信します。
node consumer.js次のサンプルコードは、consumer.js の例を示しています。
デフォルトのエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ // 全てをデバッグ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のサンプルコードを使用します。
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ // 全てをデバッグ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'security.protocol' : 'sasl_ssl', 'ssl.endpoint.identification.algorithm':'none', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'message.max.bytes': 32000, 'fetch.max.bytes' : 32000, 'fetch.message.max.bytes': 32000, 'max.partition.fetch.bytes': 32000, 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });