This topic describes how to use the SDK for Ruby to connect to the default endpoint
of a Message Queue for Apache Kafka instance and send and consume messages in a virtual private cloud (VPC).
Prerequisites
Ruby is installed. For more information, see Download Ruby.
Install the Ruby library
Run the following command to install the Ruby library: gem install ruby-kafka -v 0.6.8
Send messages
- Create a producer program named producer.rb.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "simple-producer",
logger: logger,
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
producer.deliver_messages
producer.shutdown
end
- Run the following command to run producer.rb to send messages:
Consume messages
- Create a consumer program named consumer.rb.
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "XXX:XX,XXX:XX"
topic = "XXX"
consumerGroup = "XXX"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "test",
socket_timeout: 20,
logger: logger,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
end
- Run the following command to run consumer.rb to consume messages: