All Products
Search
Document Center

ApsaraMQ for Kafka:Send and receive messages with the Node.js SDK

Last Updated:Mar 11, 2026

Use the node-rdkafka library to connect a Node.js application to ApsaraMQ for Kafka and produce and consume messages.

Prerequisites

Before you begin, make sure that you have:

  • GCC installed

  • Node.js 4.0.0 or later installed

  • OpenSSL installed

  • An ApsaraMQ for Kafka instance with a topic and consumer group created

  • (SSL only) The SSL root certificate downloaded and saved as ca-cert.pem

Install the C++ library

The node-rdkafka package depends on librdkafka, a native C++ library. Install it through the Confluent repository.

  1. Switch to the yum repository directory:

       cd /etc/yum.repos.d/
  2. Create a repository configuration file named confluent.repo with the following content:

       [Confluent.dist]
       name=Confluent repository (dist)
       baseurl=https://packages.confluent.io/rpm/5.1/7
       gpgcheck=1
       gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
       enabled=1
    
       [Confluent]
       name=Confluent repository
       baseurl=https://packages.confluent.io/rpm/5.1
       gpgcheck=1
       gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
       enabled=1
  3. Install librdkafka:

       yum install librdkafka-devel

Install the Node.js library

  1. Set the OpenSSL header and library paths for the native build:

       # Replace with your actual OpenSSL paths.
       # Run `whereis openssl` to find these paths.
       export CPPFLAGS=-I/usr/local/opt/openssl/include
       export LDFLAGS=-L/usr/local/opt/openssl/lib
  2. Install node-rdkafka:

       npm install i --unsafe-perm node-rdkafka

Configure the connection

Create a setting.js file with your connection parameters. Choose the configuration that matches your endpoint type.

Connection parameters

ParameterDescriptionWhere to find
bootstrap_serversEndpoint of your ApsaraMQ for Kafka instanceEndpoint Information section on the Instance Details page in the ApsaraMQ for Kafka console
topic_nameName of the topic to produce to or consume fromTopics page in the ApsaraMQ for Kafka console
consumer_idID of the consumer groupGroups page in the ApsaraMQ for Kafka console
sasl_plain_usernameSASL username (SSL endpoint only)Username in the Configuration Information section on the Instance Details page
sasl_plain_passwordSASL password (SSL endpoint only)Password in the Configuration Information section on the Instance Details page
If Access Control List (ACL) is not enabled on your instance, get the Simple Authentication and Security Layer (SASL) username and password from the Username and Password fields in the Configuration Information section on the Instance Details page in the ApsaraMQ for Kafka console. If ACL is enabled, make sure the SASL user is authorized to produce and consume messages. For details, see Grant permissions to SASL users.

Default endpoint

module.exports = {
    'bootstrap_servers': ["<bootstrap-servers>"],
    'topic_name': '<topic-name>',
    'consumer_id': '<consumer-group-id>'
}

SSL endpoint

module.exports = {
    'sasl_plain_username': '<sasl-username>',
    'sasl_plain_password': '<sasl-password>',
    'bootstrap_servers': ["<bootstrap-servers>"],
    'topic_name': '<topic-name>',
    'consumer_id': '<consumer-group-id>'
}

After you configure setting.js, upload it along with all files in the corresponding demo folder to your server. If you use the SSL endpoint, include the SSL root certificate file (ca-cert.pem) in the same directory.

Tip: Download the complete demo project from the aliware-kafka-demos repository. The Node.js examples are in the kafka-nodejs-demo folder, organized by endpoint type.

Produce messages

Save the following code as producer.js. This example sends a message every second and logs delivery reports.

Run the producer:

node producer.js

Default endpoint

const Kafka = require('node-rdkafka');
const config = require('./setting');

console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
    /*'debug': 'all', */
    'api.version.request': 'true',   // Negotiate API version with the broker automatically
    'bootstrap.servers': config['bootstrap_servers'],
    'dr_cb': true,                   // Enable delivery report callback
    'dr_msg_cb': true                // Include message payload in delivery reports
});

var connected = false

producer.setPollInterval(100);       // Poll for delivery reports every 100 ms

producer.connect();

producer.on('ready', function() {
  connected = true
  console.log("connect ok")
});

producer.on("disconnected", function() {
  connected = false;
  producer.connect();
})

producer.on('event.log', function(event) {
      console.log("event.log", event);
});

producer.on("error", function(error) {
    console.log("error:" + error);
});

function produce() {
  try {
    producer.produce(
      config['topic_name'],
      null,                          // Partition (null = auto-assign)
      new Buffer('Hello Ali Kafka'), // Message payload
      null,                          // Message key
      Date.now()                     // Timestamp
    );
  } catch (err) {
    console.error('A problem occurred when sending our message');
    console.error(err);
  }

}

producer.on('delivery-report', function(err, report) {
    console.log("delivery-report: producer ok");
});

producer.on('event.error', function(err) {
    console.error('event.error:' + err);
})

setInterval(produce, 1000, "Interval");

SSL endpoint

const Kafka = require('node-rdkafka');
const config = require('./setting');

console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
    /*'debug': 'all', */
    'api.version.request': 'true',
    'bootstrap.servers': config['bootstrap_servers'],
    'dr_cb': true,
    'dr_msg_cb': true,

    // SASL/SSL authentication
    'security.protocol': 'sasl_ssl',
    'ssl.ca.location': './ca-cert.pem',         // Path to the SSL root certificate
    'sasl.mechanisms': 'PLAIN',
    'ssl.endpoint.identification.algorithm': 'none',  // Disable hostname verification
    'sasl.username': config['sasl_plain_username'],
    'sasl.password': config['sasl_plain_password']
});

var connected = false

producer.setPollInterval(100);

producer.connect();

producer.on('ready', function() {
  connected = true
  console.log("connect ok")

  });

function produce() {
  try {
    producer.produce(
      config['topic_name'],
      new Buffer('Hello Ali Kafka'),
      null,
      Date.now()
    );
  } catch (err) {
    console.error('A problem occurred when sending our message');
    console.error(err);
  }
}

producer.on("disconnected", function() {
  connected = false;
  producer.connect();
})

producer.on('event.log', function(event) {
      console.log("event.log", event);
});

producer.on("error", function(error) {
    console.log("error:" + error);
});

producer.on('delivery-report', function(err, report) {
    console.log("delivery-report: producer ok");
});

// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
    console.error('event.error:' + err);
})

setInterval(produce, 1000, "Interval");

Consume messages

Save the following code as consumer.js. This example subscribes to a topic and prints each message to the console.

Run the consumer:

node consumer.js

Default endpoint

const Kafka = require('node-rdkafka');
const config = require('./setting');

console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)

var consumer = new Kafka.KafkaConsumer({
    /*'debug': 'all',*/
    'api.version.request': 'true',
    'bootstrap.servers': config['bootstrap_servers'],
    'group.id': config['consumer_id']
});

consumer.connect();

consumer.on('ready', function() {
  console.log("connect ok");
  consumer.subscribe([config['topic_name']]);
  consumer.consume();
})

consumer.on('data', function(data) {
  console.log(data);
});


consumer.on('event.log', function(event) {
      console.log("event.log", event);
});

consumer.on('error', function(error) {
    console.log("error:" + error);
});

consumer.on('event', function(event) {
        console.log("event:" + event);
});

SSL endpoint

const Kafka = require('node-rdkafka');
const config = require('./setting');

console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)

var consumer = new Kafka.KafkaConsumer({
    /*'debug': 'all',*/
    'api.version.request': 'true',
    'bootstrap.servers': config['bootstrap_servers'],

    // SASL/SSL authentication
    'security.protocol': 'sasl_ssl',
    'ssl.endpoint.identification.algorithm': 'none',
    'ssl.ca.location': './ca-cert.pem',
    'sasl.mechanisms': 'PLAIN',

    // Message size limits
    'message.max.bytes': 32000,
    'fetch.max.bytes': 32000,
    'fetch.message.max.bytes': 32000,
    'max.partition.fetch.bytes': 32000,

    'sasl.username': config['sasl_plain_username'],
    'sasl.password': config['sasl_plain_password'],
    'group.id': config['consumer_id']
});

consumer.connect();

consumer.on('ready', function() {
  console.log("connect ok");
  consumer.subscribe([config['topic_name']]);
  consumer.consume();
})

consumer.on('data', function(data) {
  console.log(data);
});


consumer.on('event.log', function(event) {
      console.log("event.log", event);
});

consumer.on('error', function(error) {
    console.log("error:" + error);
});

consumer.on('event', function(event) {
        console.log("event:" + event);
});