This topic describes how to use the SDK for Ruby to connect to the default endpoint of a Message Queue for Apache Kafka instance and send and consume messages in a virtual private cloud (VPC).

Prerequisites

Ruby is installed. For more information, see Download Ruby.

Install the Ruby library

Run the following command to install the Ruby library:
gem install ruby-kafka -v 0.6.8

Send messages

  1. Create a producer program named producer.rb.
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new($stdout)
    
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "simple-producer",
            logger: logger,
            )
    
    producer = kafka.producer
    
    begin
        $stdin.each_with_index do |line, index|
    
        producer.produce(line, topic: topic)
    
        producer.deliver_messages
    end
    
    ensure
    
        producer.deliver_messages
    
        producer.shutdown
    end
    Parameter Description
    brokers The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the default endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
  2. Run the following command to run producer.rb to send messages:
    ruby producer.rb

Consume messages

  1. Create a consumer program named consumer.rb.
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new(STDOUT)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "XXX:XX,XXX:XX"
    topic = "XXX"
    consumerGroup = "XXX"
    
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "test",
            socket_timeout: 20,
            logger: logger,
            )
    
    consumer = kafka.consumer(group_id: consumerGroup)
    consumer.subscribe(topic, start_from_beginning: false)
    
    trap("TERM") { consumer.stop }
    trap("INT") { consumer.stop }
    
    begin
        consumer.each_message(max_bytes: 64 * 1024) do |message|
        logger.info("Get message: #{message.value}")
        end
    rescue Kafka::ProcessingError => e
        warn "Got error: #{e.cause}"
        consumer.pause(e.topic, e.partition, timeout: 20)
    
        retry
    end
    Parameter Description
    brokers The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the default endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumerGroup The name of the consumer group. Group You can obtain the name of the consumer group on the Groups page in the Message Queue for Apache Kafka console.
  2. Run the following command to run consumer.rb to consume messages:
    ruby consumer.rb