本文介绍如何在VPC环境下使用Ruby SDK接入消息队列Kafka版的默认接入点并收发消息。 前提条件 您已安装Ruby。详情请参见安装Ruby。 安装Ruby依赖库 执行以下命令安装Ruby依赖库。gem install ruby-kafka -v 0.6.8 发送消息 创建发送消息程序producer.rb。# frozen_string_literal: true $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "kafka" logger = Logger.new($stdout) #logger.level = Logger::DEBUG logger.level = Logger::INFO brokers = "xxx:xx,xxx:xx" topic = "xxx" kafka = Kafka.new( seed_brokers: brokers, client_id: "simple-producer", logger: logger, ) producer = kafka.producer begin $stdin.each_with_index do |line, index| producer.produce(line, topic: topic) producer.deliver_messages end ensure producer.deliver_messages producer.shutdown end 参数 描述 brokers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 执行以下命令发送消息。ruby producer.rb 订阅消息 创建订阅消息程序consumer.rb。# frozen_string_literal: true $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "kafka" logger = Logger.new(STDOUT) #logger.level = Logger::DEBUG logger.level = Logger::INFO brokers = "XXX:XX,XXX:XX" topic = "XXX" consumerGroup = "XXX" kafka = Kafka.new( seed_brokers: brokers, client_id: "test", socket_timeout: 20, logger: logger, ) consumer = kafka.consumer(group_id: consumerGroup) consumer.subscribe(topic, start_from_beginning: false) trap("TERM") { consumer.stop } trap("INT") { consumer.stop } begin consumer.each_message(max_bytes: 64 * 1024) do |message| logger.info("Get message: #{message.value}") end rescue Kafka::ProcessingError => e warn "Got error: #{e.cause}" consumer.pause(e.topic, e.partition, timeout: 20) retry end 参数 描述 brokers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 consumerGroup Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。 执行以下命令消费消息。ruby consumer.rb
本文介绍如何在VPC环境下使用Ruby SDK接入消息队列Kafka版的默认接入点并收发消息。 前提条件 您已安装Ruby。详情请参见安装Ruby。 安装Ruby依赖库 执行以下命令安装Ruby依赖库。gem install ruby-kafka -v 0.6.8 发送消息 创建发送消息程序producer.rb。# frozen_string_literal: true $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "kafka" logger = Logger.new($stdout) #logger.level = Logger::DEBUG logger.level = Logger::INFO brokers = "xxx:xx,xxx:xx" topic = "xxx" kafka = Kafka.new( seed_brokers: brokers, client_id: "simple-producer", logger: logger, ) producer = kafka.producer begin $stdin.each_with_index do |line, index| producer.produce(line, topic: topic) producer.deliver_messages end ensure producer.deliver_messages producer.shutdown end 参数 描述 brokers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 执行以下命令发送消息。ruby producer.rb 订阅消息 创建订阅消息程序consumer.rb。# frozen_string_literal: true $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "kafka" logger = Logger.new(STDOUT) #logger.level = Logger::DEBUG logger.level = Logger::INFO brokers = "XXX:XX,XXX:XX" topic = "XXX" consumerGroup = "XXX" kafka = Kafka.new( seed_brokers: brokers, client_id: "test", socket_timeout: 20, logger: logger, ) consumer = kafka.consumer(group_id: consumerGroup) consumer.subscribe(topic, start_from_beginning: false) trap("TERM") { consumer.stop } trap("INT") { consumer.stop } begin consumer.each_message(max_bytes: 64 * 1024) do |message| logger.info("Get message: #{message.value}") end rescue Kafka::ProcessingError => e warn "Got error: #{e.cause}" consumer.pause(e.topic, e.partition, timeout: 20) retry end 参数 描述 brokers 默认接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。 topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。 consumerGroup Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。 执行以下命令消费消息。ruby consumer.rb