All Products
Search
Document Center

ApsaraMQ for Kafka:Send and receive messages with the Ruby SDK

Last Updated:Mar 11, 2026

Connect a Ruby application to ApsaraMQ for Kafka to produce and consume messages using the ruby-kafka gem.

Prerequisites

Before you begin, make sure you have:

  • Ruby installed on your server

  • An ApsaraMQ for Kafka instance with a topic and consumer group created

  • (SSL endpoint only) The SSL root certificate downloaded and saved as cert.pem

Install the ruby-kafka gem

gem install ruby-kafka -v 0.6.8

Gather connection parameters

Get the following values from the ApsaraMQ for Kafka console. Replace the corresponding placeholders in the producer and consumer code.

SSL endpoint

PlaceholderWhere to find it
<your-broker-host1:port,your-broker-host2:port>Instance Details > Endpoint Information -- copy the SSL endpoint
<your-topic-name>Topics page
<your-sasl-username>Instance Details > Configuration Information > Username
<your-sasl-password>Instance Details > Configuration Information > Password
<your-consumer-group-id>Groups page

Default endpoint

PlaceholderWhere to find it
<your-broker-host1:port,your-broker-host2:port>Instance Details > Endpoint Information -- copy the default endpoint
<your-topic-name>Topics page
<your-consumer-group-id>Groups page
Note
  • If the ACL feature is not enabled for your instance, you can obtain the SASL username and password from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

  • If the ACL feature is enabled for your instance, make sure the SASL user has the required permissions. For details, see Grant permissions to SASL users.

Produce messages

Create a file named producer.ruby using one of the following examples based on your endpoint type.

SSL endpoint

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
logger.level = Logger::INFO

# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
username = "<your-sasl-username>"
password = "<your-sasl-password>"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer",
    logger: logger,
    # Path to the SSL root certificate file.
    ssl_ca_cert: File.read('./cert.pem'),
    sasl_plain_username: username,
    sasl_plain_password: password,
    )

producer = kafka.producer

# Read lines from stdin and send each line as a message.
begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure
    # Flush any remaining messages and release resources.
    producer.deliver_messages

    producer.shutdown
end

Default endpoint

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
logger.level = Logger::INFO

# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "simple-producer",
    logger: logger,
    )

producer = kafka.producer

# Read lines from stdin and send each line as a message.
begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure
    # Flush any remaining messages and release resources.
    producer.deliver_messages

    producer.shutdown
end

Run the producer:

ruby producer.ruby

Type each message on a new line and press Enter. The producer sends each line to the topic.

Consume messages

Create a file named consumer.ruby using one of the following examples based on your endpoint type.

SSL endpoint

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
logger.level = Logger::INFO

# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
username = "<your-sasl-username>"
password = "<your-sasl-password>"
consumerGroup = "<your-consumer-group-id>"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer",
        socket_timeout: 20,
        logger: logger,
        # Path to the SSL root certificate file.
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

# Stop the consumer gracefully on TERM or INT signals.
# To stop the process, run: kill -s TERM <process-id>
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Get message: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    # If message processing fails, pause the affected partition
    # for 20 seconds before retrying. This prevents tight retry
    # loops that could overwhelm the broker.
    warn "Got error: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end

Default endpoint

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
logger.level = Logger::INFO

# Replace the placeholders with your actual values.
brokers = "<your-broker-host1:port,your-broker-host2:port>"
topic = "<your-topic-name>"
consumerGroup = "<your-consumer-group-id>"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "test",
        socket_timeout: 20,
        logger: logger,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

# Stop the consumer gracefully on TERM or INT signals.
# To stop the process, run: kill -s TERM <process-id>
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Get message: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    # If message processing fails, pause the affected partition
    # for 20 seconds before retrying. This prevents tight retry
    # loops that could overwhelm the broker.
    warn "Got error: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end

Run the consumer:

ruby consumer.ruby

The consumer joins the specified consumer group and polls for new messages. Each message is logged to stdout. Press Ctrl+C to stop the consumer gracefully.

Use the demo project

Pre-built demo files are available in the aliware-kafka-demos repository:

  1. Download and extract the repository.

  2. Navigate to the kafka-ruby-demo folder and open the subfolder that matches your endpoint type (SSL or default).

  3. Edit producer.ruby and consumer.ruby with your connection parameters.

  4. Upload the files to your server. If you use the SSL endpoint, include the cert.pem file in the same directory.

Related topics