All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for Node.js to send and receive messages

Last Updated:Mar 15, 2024

This topic describes how to use the SDK for Node.js to connect to ApsaraMQ for Kafka to send and receive messages.

Environment requirements

  • GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.

  • Node.js is installed. For more information, visit the download page of Node.js.

    Important

    The version of Node.js must be 4.0.0 or later.

  • OpenSSL is installed. For more information, see Downloads.

Install the C++ library

  1. Run the following command to switch to the /etc/yum.repos.d/ yum repository directory:

    cd /etc/yum.repos.d/
  2. Create a yum repository configuration file named confluent.repo.

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. Run the following command to install the C++ library:

    yum install librdkafka-devel

Install the Node.js library

  1. Run the following command to specify the path of the OpenSSL header file for the preprocessor:

    # Set this parameter to the path of the OpenSSL header file that is installed on your on-premises machine. 
    export CPPFLAGS=-I</usr/local/opt/openssl/include>
  2. Run the following command to specify the path of the OpenSSL library for the connector:

    # Set this parameter to the path of the OpenSSL library file that is installed on your on-premises machine. 
    export LDFLAGS=-L</usr/local/opt/openssl/lib>
  3. Run the following command to install the Node.js library:

    npm install i --unsafe-perm node-rdkafka
Note

Run the whereis openssl command to obtain the path of the OpenSSL header file and the path of the OpenSSL library.

Create configuration files

  1. (Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.

  2. Go to the aliware-kafka-demos page, click download to download the demo project to your on-premises machine and then decompress the package of the demo project.

  3. In the decompressed package, go to the kafka-nodejs-demo folder. Then, open the corresponding folder based on the endpoint that you want to use and configure the setting.js file in the folder.

    module.exports = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'bootstrap_servers': ["XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }

    Parameter

    Description

    sasl_plain_username

    The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    Note
    • If the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.

    sasl_plain_password

    The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    bootstrap_servers

    The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic_name

    The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    consumer_id

    The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.

  4. After the required parameters are configured, upload all files in the folder in which the configuration file is located to the installation directory of the Node.js dependency library on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.

Send messages

Run the following command to run producer.js to send messages:

node producer.js

The following sample code provides examples of consumer.js:

  • If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    });
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],   
          null,      
          new Buffer('Hello Ali Kafka'),      
          null,   
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    
    }
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");
  • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true,
        'security.protocol' : 'sasl_ssl',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'ssl.endpoint.identification.algorithm':'none',
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password']
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    
      });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],
          new Buffer('Hello Ali Kafka'),
          null,
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    }
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    // Any errors we encounter, including connection errors
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");

Receive messages

Run the following command to run consumer.js to receive messages:

node consumer.js

The following sample code provides examples of consumer.js:

  • If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });
  • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'security.protocol' : 'sasl_ssl',
        'ssl.endpoint.identification.algorithm':'none',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'message.max.bytes': 32000,
        'fetch.max.bytes' : 32000,
        'fetch.message.max.bytes': 32000,
        'max.partition.fetch.bytes': 32000,
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });