このトピックでは、Ruby SDK を使用して ApsaraMQ for Kafka インスタンスのエンドポイントに接続し、メッセージを送受信する方法について説明します。
環境要件
Ruby がインストールされていること。詳細については、「Ruby」をご参照ください。
Ruby ライブラリのインストール
Ruby ライブラリをインストールするには、次のコマンドを実行します。
gem install ruby-kafka -v 0.6.8構成ファイルの準備
オプション:SSL ルート証明書をダウンロードします。SSL エンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、この証明書をダウンロードする必要があります。
aliware-kafka-demos ページに移動し、
をクリックしてデモプロジェクトをオンプレミス マシンにダウンロードし、デモプロジェクトのパッケージを解凍します。解凍したパッケージで、kafka-ruby-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の producer.ruby ファイルと consumer.ruby ファイルを構成します。
表 1. パラメーター
パラメーター
説明
brokers
Message Queue for Apache Kafka インスタンスの SSL エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションで SSL エンドポイントを取得できます。
topic
トピックの名前。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピックの名前を取得できます。
username
Simple Authentication and Security Layer (SASL) ユーザーのユーザー名。デフォルトのエンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、このパラメーターは除外されます。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。
password
SASL ユーザーのパスワード。デフォルトのエンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、このパラメーターは除外されます。
consumerGroup
コンシューマー グループの ID。Group の管理ApsaraMQ for Kafka コンソール の [グループ] ページでコンシューマー グループの ID を取得できます。
必要なパラメーターを構成した後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の Ruby インストール ディレクトリにアップロードします。SSL エンドポイントに対応するフォルダーには、SSL ルート証明書ファイルが含まれています。
メッセージの送信
producer.ruby を実行してメッセージを送信するには、次のコマンドを実行します。
ruby producer.rubyコード内のパラメーターの詳細については、「パラメーター」をご参照ください。
次のサンプル コードは、producer.ruby の例を示しています。
サンプル コードでは、SSL エンドポイントが使用されています。Message Queue for Apache Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更し、太字でフォーマットされたコメントに基づいて他のコードを変更します。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new($stdout)
#logger.level = Logger::DEBUG # ロガーレベルをデバッグに設定
logger.level = Logger::INFO # ロガーレベルを情報に設定
brokers = "xxx:xx,xxx:xx" # ブローカーアドレス。カンマで区切って複数のブローカーを指定できます。
topic = "xxx" # トピック名
username = "xxx" # SASL ユーザー名
password = "xxx" # SASL パスワード
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-producer", # デフォルトエンドポイントを使用する場合は、この値を simple-producer に変更します。
logger: logger,
# put "./cert.pem" to anywhere this can read # cert.pem を読み取り可能な場所に配置します。
# デフォルトエンドポイントを使用する場合は、次の 3 行を削除します。
ssl_ca_cert: File.read('./cert.pem'), # SSL 証明書ファイルのパス
sasl_plain_username: username, # SASL ユーザー名
sasl_plain_password: password, # SASL パスワード
)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index| # 標準入力からメッセージを読み込み
producer.produce(line, topic: topic) # メッセージを生成
producer.deliver_messages # メッセージを送信
end
ensure
producer.deliver_messages # バッファリングされたメッセージを送信
producer.shutdown # プロデューサーをシャットダウン
endメッセージの受信
consumer.ruby を実行してメッセージを受信するには、次のコマンドを実行します。
ruby consumer.ruby次のサンプル コードは、consumer.ruby の例を示しています。
コード内のパラメーターの詳細については、「パラメーター」をご参照ください。
サンプル コードでは、SSL エンドポイントが使用されています。Message Queue for Apache Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更し、太字でフォーマットされたコメントに基づいて他のコードを変更します。
# frozen_string_literal: true
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "kafka"
logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG # ロガーレベルをデバッグに設定
logger.level = Logger::INFO # ロガーレベルを情報に設定
brokers = "xxx:xx,xxx:xx" # ブローカーアドレス。カンマで区切って複数のブローカーを指定できます。
topic = "xxx" # トピック名
username = "xxx" # SASL ユーザー名
password = "xxx" # SASL パスワード
consumerGroup = "xxx" # コンシューマーグループ ID
kafka = Kafka.new(
seed_brokers: brokers,
client_id: "sasl-consumer", # デフォルトエンドポイントを使用する場合は、この値を test に変更します。
socket_timeout: 20, # ソケットタイムアウト時間(秒)
logger: logger,
# put "./cert.pem" to anywhere this can read # cert.pem を読み取り可能な場所に配置します。
# デフォルトエンドポイントを使用する場合は、次の 3 行を削除します。
ssl_ca_cert: File.read('./cert.pem'), # SSL 証明書ファイルのパス
sasl_plain_username: username, # SASL ユーザー名
sasl_plain_password: password, # SASL パスワード
)
consumer = kafka.consumer(group_id: consumerGroup) # コンシューマーを作成
consumer.subscribe(topic, start_from_beginning: false) # トピックを購読。start_from_beginning: true の場合、最初からメッセージを読み込みます。
trap("TERM") { consumer.stop } # TERM シグナルを受信したらコンシューマーを停止
trap("INT") { consumer.stop } # INT シグナルを受信したらコンシューマーを停止
begin
consumer.each_message(max_bytes: 64 * 1024) do |message| # メッセージを消費
logger.info("Get message: #{message.value}") # メッセージの内容を出力
end
rescue Kafka::ProcessingError => e # エラー処理
warn "Got error: #{e.cause}" # エラーメッセージを出力
consumer.pause(e.topic, e.partition, timeout: 20) # パーティションを一時停止
retry # 再試行
end