This topic describes how to use the SDK for Node.js to connect to an endpoint of a Message Queue for Apache Kafka instance and send and subscribe to messages.

Environment requirements

  • GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.
  • Node.js is installed. For more information, see Downloads.
    Notice The version of Node.js must be V4.0.0 or later.
  • OpenSSL is installed. For more information, see Downloads.

Install the C++ dependency library

  1. Run the following command to go to the /etc/yum.repos.d/ directory in which the yum repository is installed:
    cd /etc/yum.repos.d/
  2. Create a yum repository configuration file named confluent.repo.
    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. Run the following command to install the C++ dependency library:
    yum install librdkafka-devel

Install the Node.js dependency library

  1. Run the following command to specify the path of the OpenSSL header file for the preprocessor:
    # Specify the path of the OpenSSL header file that is installed on your on-premises machine. 
    export CPPFLAGS=-I</usr/local/opt/openssl/include>
  2. Run the following command to specify the path of the OpenSSL library for the connector:
    # Specify the path of the OpenSSL library that is installed on your on-premises machine. 
    export LDFLAGS=-L</usr/local/opt/openssl/lib>
  3. Run the following command to install the Node.js dependency library:
    npm install i --unsafe-perm node-rdkafka
Note Run the whereis openssl command to obtain the path of the OpenSSL header file and the path of the OpenSSL library.

Prepare a configuration file

  1. Optional:Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must download this certificate.
  2. In the decompressed package, go to the kafka-nodejs-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the setting.js file in the folder.
    module.exports = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'bootstrap_servers': ["XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }
    Parameter Description
    sasl_plain_username The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    sasl_plain_password The password of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    bootstrap_servers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic_name The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumer_id The ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the Message Queue for Apache Kafka console.
  3. After the required parameters are configured, upload all files in the folder in which the configuration file is located to the installation directory of the Node.js dependency library on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.

Send messages

Run the following command to run producer.js to send messages:

node producer.js

The following sample code provides examples of producer.js:

  • If you use the default endpoint, use the following sample code:
    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    });
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],   
          null,      
          new Buffer('Hello Ali Kafka'),      
          null,   
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    
    }
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");
  • If you use the SSL endpoint, use the following sample code:
    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true,
        'security.protocol' : 'sasl_ssl',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password']
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    
      });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],
          new Buffer('Hello Ali Kafka'),
          null,
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    }
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    // Any errors we encounter, including connection errors
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");

Subscribe to messages

Run the following command to run consumer.js to subscribe to messages:

node consumer.js

The following sample code provides examples of consumer.js:

  • If you use the default endpoint, use the following sample code:
    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });
  • If you use the SSL endpoint, use the following sample code:
    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'security.protocol' : 'sasl_ssl',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'message.max.bytes': 32000,
        'fetch.max.bytes' : 32000,
        'fetch.message.max.bytes': 32000,
        'max.partition.fetch.bytes': 32000,
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });