すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:PHP 用 SDK を使用してメッセージを送受信する

最終更新日:Mar 19, 2025

このトピックでは、PHP 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。

環境要件

  • GNU Compiler Collection(GCC)がインストールされていること。詳細については、「Installing GCC」をご参照ください。

  • PHP がインストールされていること。詳細については、「Downloads」をご参照ください。

  • PHP Extension Community Library(PECL)がインストールされていること。詳細については、「Downloading PECL extensions」をご参照ください。

C++ ライブラリのインストール

  1. 次のコマンドを実行して、/etc/yum.repos.d/ yum リポジトリディレクトリに切り替えます。

    cd /etc/yum.repos.d/
  2. confluent.repo という名前の yum リポジトリ構成ファイルを作成します。

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. 次のコマンドを実行して、C++ ライブラリをインストールします。

    yum install librdkafka-devel

PHP ライブラリのインストール

  1. 次のコマンドを実行して、PHP ライブラリをインストールします。

    pecl install rdkafka
  2. PHP 初期化ファイル php.ini で、次の行を追加して Kafka 拡張機能を有効にします。

    extension=rdkafka.so

構成ファイルの準備

  1. (オプション) Secure Sockets Layer(SSL)ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。

  2. aliware-kafka-demos ページに移動し、download をクリックしてデモプロジェクトをオンプレミス マシンにダウンロードし、デモプロジェクトのパッケージを解凍します。

  3. 解凍したパッケージで、kafka-php-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の setting.php ファイルを構成します。

    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];

    パラメーター

    説明

    sasl_plain_username

    Simple Authentication and Security Layer(SASL)ユーザーのユーザー名。デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。

    sasl_plain_password

    SASL ユーザーのパスワード。デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。

    bootstrap_servers

    ApsaraMQ for KafkaインスタンスのSSLエンドポイント。アクセスポイント情報 セクションの インスタンスの詳細 ページ (ApsaraMQ for Kafka コンソール) でエンドポイントを取得できます。

    topic_name

    トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピック名を取得できます。

    consumer_id

    グループ ID。 Group の管理 ページ (ApsaraMQ for Kafka コンソール) でグループ ID を取得できます。

  4. 必要なパラメーターを構成した後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の PHP インストールディレクトリにアップロードします。SSL エンドポイントに対応するフォルダーには、SSL ルート証明書ファイルが含まれています。

メッセージの送信

次のコマンドを実行して kafka-producer.php を実行し、メッセージを送信します。

php kafka-producer.php

次のサンプルコードは、kafka-producer.php の例を示しています。

説明

サンプルコードでは、SSL エンドポイントが使用されています。デフォルトエンドポイントを使用する場合は、SASL 関連のコードは必要ありません。sasl. または ssl. を含む行をサンプルコードから削除してください。

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# デバッグを行う場合は、ログレベルを LOG_DEBUG に設定します
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

サンプルコードの詳細については、「php-rdkafka」をご参照ください。

メッセージの購読

次のコマンドを実行して kafka-consumer.php を実行し、メッセージを購読します。

php kafka-consumer.php

次のサンプルコードは、kafka-consumer.php の例を示しています。

説明

サンプルコードでは、SSL エンドポイントが使用されています。デフォルトエンドポイントを使用する場合は、SASL 関連のコードは必要ありません。sasl. または ssl. を含む行をサンプルコードから削除してください。

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('session.timeout.ms', 10000);

$conf->set('request.timeout.ms', 305000);

$conf->set('group.id', $setting['consumer_id']);

$conf->set('ssl.endpoint.identification.algorithm', 'none');

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "パーティション割り当てを待機しています... (終了後に\n";
echo "すぐにグループに再参加する場合、時間がかかる場合があります。)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "これ以上のメッセージはありません。さらに待機します\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "タイムアウトしました\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

サンプルコードの詳細については、「php-rdkafka」をご参照ください。