This topic describes how a Ruby client uses SDK for Ruby to connect to the default endpoint of Message Queue for Apache Kafka and send and subscribe to messages in a virtual private cloud (VPC).

Prerequisites

Ruby is installed. For more information, see Download Ruby.

Install the Ruby library

Run the following command to install the Ruby library:
gem install ruby-kafka -v 0.6.8

Send messages

  1. Create a message sender producer.rb.
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path(".. /../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new($stdout)
    
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "simple-producer",
            logger: logger,
            )
    
    producer = kafka.producer
    
    begin
        $stdin.each_with_index do |line, index|
    
        producer.produce(line, topic: topic)
    
        producer.deliver_messages
    end
    
    ensure
    
        producer.deliver_messages
    
        producer.shutdown
    end
    Parameter Description
    brokers The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the default endpoint in the Basic Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
  2. Run the following command to send messages:
    ruby producer.rb

Subscribe to messages

  1. Create a subscription program consumer.rb.
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path(".. /../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new(STDOUT)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "XXX:XX,XXX:XX"
    topic = "XXX"
    consumerGroup = "XXX"
    
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "test",
            socket_timeout: 20,
            logger: logger,
            )
    
    consumer = kafka.consumer(group_id: consumerGroup)
    consumer.subscribe(topic, start_from_beginning: false)
    
    trap("TERM") { consumer.stop }
    trap("INT") { consumer.stop }
    
    begin
        consumer.each_message(max_bytes: 64 * 1024) do |message|
        logger.info("Get message: #{message.value}")
        end
    rescue Kafka::ProcessingError => e
        warn "Got error: #{e.cause}"
        consumer.pause(e.topic, e.partition, timeout: 20)
    
        retry
    end
    Parameter Description
    brokers The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the default endpoint in the Basic Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    consumerGroup The name of the consumer group. You can obtain the name of the consumer group on the Consumer Groups page in the Message Queue for Apache Kafka console.
  2. Run the following command to consume messages:
    ruby consumer.rb