This topic shows you how to use the SDK for Ruby to connect to the SSL endpoint of a Message Queue for Apache Kafka instance and use the PLAIN mechanism to send and consume messages over the Internet.

Prerequisites

Ruby is installed. For more information, see Download Ruby.

Install the Ruby library

Run the following command to install the Ruby library:
gem install ruby-kafka -v 0.6.8

Preparations

Send messages

  1. Create a producer program named producer.rb that contains the following code:
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new($stdout)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    username = "xxx"
    password = "xxx"
    
    kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-producer",
        logger: logger,
        # put "./cert.pem" to anywhere this can read
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )
    
    producer = kafka.producer
    
    begin
        $stdin.each_with_index do |line, index|
    
        producer.produce(line, topic: topic)
    
        producer.deliver_messages
    end
    
    ensure
    
        producer.deliver_messages
    
        producer.shutdown
    end
    Parameter Description
    brokers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    username The username of the Simple Authentication and Security Layer (SASL) user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    password The password of the SASL user.
  2. Run the following command to run producer.rb to send messages:
    ruby producer.rb

Consume messages

  1. Create a consumer program named consumer.rb that contains the following code:
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new(STDOUT)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    username = "xxx"
    password = "xxx"
    consumerGroup = "xxx"
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "sasl-consumer",
            socket_timeout: 20,
            logger: logger,
            # put "./cert.pem" to anywhere this can read
            ssl_ca_cert: File.read('./cert.pem'),
            sasl_plain_username: username,
            sasl_plain_password: password,
            )
    
    consumer = kafka.consumer(group_id: consumerGroup)
    consumer.subscribe(topic, start_from_beginning: false)
    
    trap("TERM") { consumer.stop }
    trap("INT") { consumer.stop }
    
    begin
        consumer.each_message(max_bytes: 64 * 1024) do |message|
        logger.info("Get message: #{message.value}")
        end
    rescue Kafka::ProcessingError => e
        warn "Got error: #{e.cause}"
        consumer.pause(e.topic, e.partition, timeout: 20)
    
        retry
    end
    Parameter Description
    brokers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    username The username of the SASL user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    password The password of the SASL user.
    consumerGroup The name of the consumer group. Group You can obtain the name on the Groups page in the Message Queue for Apache Kafka console.
  2. Run the following command to run consumer.rb to consume messages:
    ruby consumer.rb