本文介绍如何使用Ruby SDK通过接入点接入消息队列Kafka版并收发消息。
环境准备
您已安装Ruby。更多信息,请参见安装Ruby。
安装Ruby依赖库
执行以下命令安装Ruby依赖库。
gem install ruby-kafka -v 0.6.8
准备配置
发送消息
执行以下命令发送消息。
ruby producer.ruby
关于代码中配置项说明,请参见表 1。
消息程序producer.ruby代码示例如下:
说明 示例代码为SSL接入点的代码。您需要根据实际接入点类型,删除或者修改配置项,其余代码请根据加粗代码注释修改。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer", #如果是默认接入点,取值需修改为“simple-producer”。
logger: logger,
# put "./cert.pem" to anywhere this can read
#如果是默认接入点,删除以下三行代码。
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
producer.deliver_messages
end
ensure
producer.deliver_messages
producer.shutdown
end
订阅消息
执行以下命令消费消息。
ruby consumer.ruby
消息程序consumer.ruby示例代码如下:
关于代码中配置项说明,请参见表 1。
说明 示例代码为SSL接入点的代码。您需要根据实际接入点类型,删除或者修改配置项,其余代码请根据加粗代码注释修改。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO
brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer", #如果是默认接入点,取值需修改为“test”
socket_timeout: 20,
logger: logger,
# put "./cert.pem" to anywhere this can read
#如果是默认接入点,删除以下三行代码。
ssl_ca_cert: File.read('./cert.pem'),
sasl_plain_username: username,
sasl_plain_password: password,
)
consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
begin
consumer.each_message(max_bytes: 64 * 1024) do |message|
logger.info("Get message: #{message.value}")
end
rescue Kafka::ProcessingError => e
warn "Got error: #{e.cause}"
consumer.pause(e.topic, e.partition, timeout: 20)
retry
end