すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Ruby SDK を使用してメッセージを送受信する

最終更新日:Mar 19, 2025

このトピックでは、Ruby SDK を使用して ApsaraMQ for Kafka インスタンスのエンドポイントに接続し、メッセージを送受信する方法について説明します。

環境要件

Ruby がインストールされていること。詳細については、「Ruby」をご参照ください。

Ruby ライブラリのインストール

Ruby ライブラリをインストールするには、次のコマンドを実行します。

gem install ruby-kafka -v 0.6.8

構成ファイルの準備

  1. オプション:SSL ルート証明書をダウンロードします。SSL エンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、この証明書をダウンロードする必要があります。

  2. aliware-kafka-demos ページに移動し、download をクリックしてデモプロジェクトをオンプレミス マシンにダウンロードし、デモプロジェクトのパッケージを解凍します。

  3. 解凍したパッケージで、kafka-ruby-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の producer.ruby ファイルと consumer.ruby ファイルを構成します。

    表 1. パラメーター

    パラメーター

    説明

    brokers

    Message Queue for Apache Kafka インスタンスの SSL エンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションで SSL エンドポイントを取得できます。

    topic

    トピックの名前。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピックの名前を取得できます。

    username

    Simple Authentication and Security Layer (SASL) ユーザーのユーザー名。デフォルトのエンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、このパラメーターは除外されます。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。

    password

    SASL ユーザーのパスワード。デフォルトのエンドポイントを使用して Message Queue for Apache Kafka インスタンスに接続する場合は、このパラメーターは除外されます。

    consumerGroup

    コンシューマー グループの ID。Group の管理ApsaraMQ for Kafka コンソール の [グループ] ページでコンシューマー グループの ID を取得できます。

  4. 必要なパラメーターを構成した後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の Ruby インストール ディレクトリにアップロードします。SSL エンドポイントに対応するフォルダーには、SSL ルート証明書ファイルが含まれています。

メッセージの送信

producer.ruby を実行してメッセージを送信するには、次のコマンドを実行します。

ruby producer.ruby

コード内のパラメーターの詳細については、「パラメーター」をご参照ください。

次のサンプル コードは、producer.ruby の例を示しています。

説明

サンプル コードでは、SSL エンドポイントが使用されています。Message Queue for Apache Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更し、太字でフォーマットされたコメントに基づいて他のコードを変更します。

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
#logger.level = Logger::DEBUG  # ロガーレベルをデバッグに設定
logger.level = Logger::INFO # ロガーレベルを情報に設定


brokers = "xxx:xx,xxx:xx" # ブローカーアドレス。カンマで区切って複数のブローカーを指定できます。
topic = "xxx" # トピック名
username = "xxx" # SASL ユーザー名
password = "xxx" # SASL パスワード

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer", # デフォルトエンドポイントを使用する場合は、この値を simple-producer に変更します。
    logger: logger,
    # put "./cert.pem" to anywhere this can read # cert.pem を読み取り可能な場所に配置します。
    # デフォルトエンドポイントを使用する場合は、次の 3 行を削除します。
    ssl_ca_cert: File.read('./cert.pem'), # SSL 証明書ファイルのパス    
    sasl_plain_username: username, # SASL ユーザー名
    sasl_plain_password: password, # SASL パスワード
    )

producer = kafka.producer

begin
    $stdin.each_with_index do |line, index| # 標準入力からメッセージを読み込み

    producer.produce(line, topic: topic) # メッセージを生成

    producer.deliver_messages # メッセージを送信
end

ensure

    producer.deliver_messages # バッファリングされたメッセージを送信

    producer.shutdown # プロデューサーをシャットダウン
end

メッセージの受信

consumer.ruby を実行してメッセージを受信するには、次のコマンドを実行します。

ruby consumer.ruby

次のサンプル コードは、consumer.ruby の例を示しています。

コード内のパラメーターの詳細については、「パラメーター」をご参照ください。

説明

サンプル コードでは、SSL エンドポイントが使用されています。Message Queue for Apache Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更し、太字でフォーマットされたコメントに基づいて他のコードを変更します。

# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG # ロガーレベルをデバッグに設定
logger.level = Logger::INFO # ロガーレベルを情報に設定

brokers = "xxx:xx,xxx:xx" # ブローカーアドレス。カンマで区切って複数のブローカーを指定できます。
topic = "xxx" # トピック名
username = "xxx" # SASL ユーザー名
password = "xxx" # SASL パスワード
consumerGroup = "xxx" # コンシューマーグループ ID

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer", # デフォルトエンドポイントを使用する場合は、この値を test に変更します。
        socket_timeout: 20, # ソケットタイムアウト時間(秒)
        logger: logger,
        # put "./cert.pem" to anywhere this can read # cert.pem を読み取り可能な場所に配置します。
        # デフォルトエンドポイントを使用する場合は、次の 3 行を削除します。
        ssl_ca_cert: File.read('./cert.pem'), # SSL 証明書ファイルのパス
        sasl_plain_username: username, # SASL ユーザー名
        sasl_plain_password: password, # SASL パスワード
        )

consumer = kafka.consumer(group_id: consumerGroup) # コンシューマーを作成
consumer.subscribe(topic, start_from_beginning: false) # トピックを購読。start_from_beginning: true の場合、最初からメッセージを読み込みます。

trap("TERM") { consumer.stop } # TERM シグナルを受信したらコンシューマーを停止
trap("INT") { consumer.stop } # INT シグナルを受信したらコンシューマーを停止

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message| # メッセージを消費
    logger.info("Get message: #{message.value}") # メッセージの内容を出力
    end
rescue Kafka::ProcessingError => e # エラー処理
    warn "Got error: #{e.cause}" # エラーメッセージを出力
    consumer.pause(e.topic, e.partition, timeout: 20) # パーティションを一時停止

    retry # 再試行
end