本文介绍如何使用Node.js SDK通过接入点接入消息队列Kafka版并收发消息。
环境配置
安装C++依赖库
安装Node.js依赖库
说明 在命令行窗口执行whereis openssl命令获取OpenSSL头文件路径和库路径。
准备配置
发送消息
执行如下命令发送消息。
node producer.js
代码示例
- 默认接入点
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); function produce() { try { producer.produce( config['topic_name'], null, new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
- SSL接入点
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true, 'security.protocol' : 'sasl_ssl', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'] }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); function produce() { try { producer.produce( config['topic_name'], new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); // Any errors we encounter, including connection errors producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
订阅消息
执行如下命令消费消息。
node consumer.js
代码示例
- 默认接入点
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });
- SSL接入点
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'security.protocol' : 'sasl_ssl', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'message.max.bytes': 32000, 'fetch.max.bytes' : 32000, 'fetch.message.max.bytes': 32000, 'max.partition.fetch.bytes': 32000, 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });