Use the node-rdkafka library to connect a Node.js application to ApsaraMQ for Kafka and produce and consume messages.
Prerequisites
Before you begin, make sure that you have:
GCC installed
Node.js 4.0.0 or later installed
OpenSSL installed
An ApsaraMQ for Kafka instance with a topic and consumer group created
(SSL only) The SSL root certificate downloaded and saved as
ca-cert.pem
Install the C++ library
The node-rdkafka package depends on librdkafka, a native C++ library. Install it through the Confluent repository.
Switch to the yum repository directory:
cd /etc/yum.repos.d/Create a repository configuration file named
confluent.repowith the following content:[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1Install
librdkafka:yum install librdkafka-devel
Install the Node.js library
Set the OpenSSL header and library paths for the native build:
# Replace with your actual OpenSSL paths. # Run `whereis openssl` to find these paths. export CPPFLAGS=-I/usr/local/opt/openssl/include export LDFLAGS=-L/usr/local/opt/openssl/libInstall
node-rdkafka:npm install i --unsafe-perm node-rdkafka
Configure the connection
Create a setting.js file with your connection parameters. Choose the configuration that matches your endpoint type.
Connection parameters
| Parameter | Description | Where to find |
|---|---|---|
bootstrap_servers | Endpoint of your ApsaraMQ for Kafka instance | Endpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console |
topic_name | Name of the topic to produce to or consume from | Topics page in the ApsaraMQ for Kafka console |
consumer_id | ID of the consumer group | Groups page in the ApsaraMQ for Kafka console |
sasl_plain_username | SASL username (SSL endpoint only) | Username in the Configuration Information section on the Instance Details page |
sasl_plain_password | SASL password (SSL endpoint only) | Password in the Configuration Information section on the Instance Details page |
If Access Control List (ACL) is not enabled on your instance, get the Simple Authentication and Security Layer (SASL) username and password from the Username and Password fields in the Configuration Information section on the Instance Details page in the ApsaraMQ for Kafka console. If ACL is enabled, make sure the SASL user is authorized to produce and consume messages. For details, see Grant permissions to SASL users.
Default endpoint
module.exports = {
'bootstrap_servers': ["<bootstrap-servers>"],
'topic_name': '<topic-name>',
'consumer_id': '<consumer-group-id>'
}SSL endpoint
module.exports = {
'sasl_plain_username': '<sasl-username>',
'sasl_plain_password': '<sasl-password>',
'bootstrap_servers': ["<bootstrap-servers>"],
'topic_name': '<topic-name>',
'consumer_id': '<consumer-group-id>'
}After you configure setting.js, upload it along with all files in the corresponding demo folder to your server. If you use the SSL endpoint, include the SSL root certificate file (ca-cert.pem) in the same directory.
Tip: Download the complete demo project from the aliware-kafka-demos repository. The Node.js examples are in the kafka-nodejs-demo folder, organized by endpoint type.Produce messages
Save the following code as producer.js. This example sends a message every second and logs delivery reports.
Run the producer:
node producer.jsDefault endpoint
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);
var producer = new Kafka.Producer({
/*'debug': 'all', */
'api.version.request': 'true', // Negotiate API version with the broker automatically
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true, // Enable delivery report callback
'dr_msg_cb': true // Include message payload in delivery reports
});
var connected = false
producer.setPollInterval(100); // Poll for delivery reports every 100 ms
producer.connect();
producer.on('ready', function() {
connected = true
console.log("connect ok")
});
producer.on("disconnected", function() {
connected = false;
producer.connect();
})
producer.on('event.log', function(event) {
console.log("event.log", event);
});
producer.on("error", function(error) {
console.log("error:" + error);
});
function produce() {
try {
producer.produce(
config['topic_name'],
null, // Partition (null = auto-assign)
new Buffer('Hello Ali Kafka'), // Message payload
null, // Message key
Date.now() // Timestamp
);
} catch (err) {
console.error('A problem occurred when sending our message');
console.error(err);
}
}
producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})
setInterval(produce, 1000, "Interval");SSL endpoint
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);
var producer = new Kafka.Producer({
/*'debug': 'all', */
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true,
'dr_msg_cb': true,
// SASL/SSL authentication
'security.protocol': 'sasl_ssl',
'ssl.ca.location': './ca-cert.pem', // Path to the SSL root certificate
'sasl.mechanisms': 'PLAIN',
'ssl.endpoint.identification.algorithm': 'none', // Disable hostname verification
'sasl.username': config['sasl_plain_username'],
'sasl.password': config['sasl_plain_password']
});
var connected = false
producer.setPollInterval(100);
producer.connect();
producer.on('ready', function() {
connected = true
console.log("connect ok")
});
function produce() {
try {
producer.produce(
config['topic_name'],
new Buffer('Hello Ali Kafka'),
null,
Date.now()
);
} catch (err) {
console.error('A problem occurred when sending our message');
console.error(err);
}
}
producer.on("disconnected", function() {
connected = false;
producer.connect();
})
producer.on('event.log', function(event) {
console.log("event.log", event);
});
producer.on("error", function(error) {
console.log("error:" + error);
});
producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})
setInterval(produce, 1000, "Interval");Consume messages
Save the following code as consumer.js. This example subscribes to a topic and prints each message to the console.
Run the consumer:
node consumer.jsDefault endpoint
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)
var consumer = new Kafka.KafkaConsumer({
/*'debug': 'all',*/
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'group.id': config['consumer_id']
});
consumer.connect();
consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})
consumer.on('data', function(data) {
console.log(data);
});
consumer.on('event.log', function(event) {
console.log("event.log", event);
});
consumer.on('error', function(error) {
console.log("error:" + error);
});
consumer.on('event', function(event) {
console.log("event:" + event);
});SSL endpoint
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)
var consumer = new Kafka.KafkaConsumer({
/*'debug': 'all',*/
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
// SASL/SSL authentication
'security.protocol': 'sasl_ssl',
'ssl.endpoint.identification.algorithm': 'none',
'ssl.ca.location': './ca-cert.pem',
'sasl.mechanisms': 'PLAIN',
// Message size limits
'message.max.bytes': 32000,
'fetch.max.bytes': 32000,
'fetch.message.max.bytes': 32000,
'max.partition.fetch.bytes': 32000,
'sasl.username': config['sasl_plain_username'],
'sasl.password': config['sasl_plain_password'],
'group.id': config['consumer_id']
});
consumer.connect();
consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})
consumer.on('data', function(data) {
console.log(data);
});
consumer.on('event.log', function(event) {
console.log("event.log", event);
});
consumer.on('error', function(error) {
console.log("error:" + error);
});
consumer.on('event', function(event) {
console.log("event:" + event);
});