Connect a Ruby application to ApsaraMQ for Kafka to produce and consume messages using the ruby-kafka gem.
Prerequisites
Before you begin, make sure you have:
Ruby installed on your server
An ApsaraMQ for Kafka instance with a topic and consumer group created
(SSL endpoint only) The SSL root certificate downloaded and saved as
cert.pem
Install the ruby-kafka gem
gem install ruby-kafka -v 0.6.8Gather connection parameters
Get the following values from the ApsaraMQ for Kafka console. Replace the corresponding placeholders in the producer and consumer code.
SSL endpoint
| Placeholder | Where to find it |
|---|---|
<your-broker-host1:port,your-broker-host2:port> | Instance Details > Endpoint Information -- copy the SSL endpoint |
<your-topic-name> | Topics page |
<your-sasl-username> | Instance Details > Configuration Information > Username |
<your-sasl-password> | Instance Details > Configuration Information > Password |
<your-consumer-group-id> | Groups page |
Default endpoint
| Placeholder | Where to find it |
|---|---|
<your-broker-host1:port,your-broker-host2:port> | Instance Details > Endpoint Information -- copy the default endpoint |
<your-topic-name> | Topics page |
<your-consumer-group-id> | Groups page |
If the ACL feature is not enabled for your instance, you can obtain the SASL username and password from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for your instance, make sure the SASL user has the required permissions. For details, see Grant permissions to SASL users.
Produce messages
Create a file named producer.ruby using one of the following examples based on your endpoint type.
SSL endpoint
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
logger.level = Logger::INFO
# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
username = "<your-sasl-username>"
password = "<your-sasl-password>"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer",
logger: logger,
# Path to the SSL root certificate file.
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
producer = kafka.producer
# Read lines from stdin and send each line as a message.
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
# Flush any remaining messages and release resources.
producer.deliver_messages
producer.shutdown
endDefault endpoint
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
logger.level = Logger::INFO
# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "simple-producer",
logger: logger,
)
producer = kafka.producer
# Read lines from stdin and send each line as a message.
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
# Flush any remaining messages and release resources.
producer.deliver_messages
producer.shutdown
endRun the producer:
ruby producer.rubyType each message on a new line and press Enter. The producer sends each line to the topic.
Consume messages
Create a file named consumer.ruby using one of the following examples based on your endpoint type.
SSL endpoint
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
logger.level = Logger::INFO
# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
username = "<your-sasl-username>"
password = "<your-sasl-password>"
consumerGroup = "<your-consumer-group-id>"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer",
socket_timeout: 20,
logger: logger,
# Path to the SSL root certificate file.
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
# Stop the consumer gracefully on TERM or INT signals.
# To stop the process, run: kill -s TERM <process-id>
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
# If message processing fails, pause the affected partition
# for 20 seconds before retrying. This prevents tight retry
# loops that could overwhelm the broker.
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
endDefault endpoint
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
logger.level = Logger::INFO
# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
consumerGroup = "<your-consumer-group-id>"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "test",
socket_timeout: 20,
logger: logger,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
# Stop the consumer gracefully on TERM or INT signals.
# To stop the process, run: kill -s TERM <process-id>
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
# If message processing fails, pause the affected partition
# for 20 seconds before retrying. This prevents tight retry
# loops that could overwhelm the broker.
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
endRun the consumer:
ruby consumer.rubyThe consumer joins the specified consumer group and polls for new messages. Each message is logged to stdout. Press Ctrl+C to stop the consumer gracefully.
Use the demo project
Pre-built demo files are available in the aliware-kafka-demos repository:
Download and extract the repository.
Navigate to the
kafka-ruby-demofolder and open the subfolder that matches your endpoint type (SSL or default).Edit
producer.rubyandconsumer.rubywith your connection parameters.Upload the files to your server. If you use the SSL endpoint, include the
cert.pemfile in the same directory.