このトピックでは、PHP 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。
環境要件
GNU Compiler Collection(GCC)がインストールされていること。詳細については、「Installing GCC」をご参照ください。
PHP がインストールされていること。詳細については、「Downloads」をご参照ください。
PHP Extension Community Library(PECL)がインストールされていること。詳細については、「Downloading PECL extensions」をご参照ください。
C++ ライブラリのインストール
次のコマンドを実行して、/etc/yum.repos.d/ yum リポジトリディレクトリに切り替えます。
cd /etc/yum.repos.d/confluent.repo という名前の yum リポジトリ構成ファイルを作成します。
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1次のコマンドを実行して、C++ ライブラリをインストールします。
yum install librdkafka-devel
PHP ライブラリのインストール
次のコマンドを実行して、PHP ライブラリをインストールします。
pecl install rdkafkaPHP 初期化ファイル php.ini で、次の行を追加して Kafka 拡張機能を有効にします。
extension=rdkafka.so
構成ファイルの準備
(オプション) Secure Sockets Layer(SSL)ルート証明書をダウンロードします。SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。
aliware-kafka-demos ページに移動し、
をクリックしてデモプロジェクトをオンプレミス マシンにダウンロードし、デモプロジェクトのパッケージを解凍します。解凍したパッケージで、kafka-php-demo フォルダーに移動します。次に、使用するエンドポイントに基づいて対応するフォルダーを開き、フォルダー内の setting.php ファイルを構成します。
<?php return [ 'sasl_plain_username' => 'xxx', 'sasl_plain_password' => 'xxx', 'bootstrap_servers' => "xxx:xx,xxx:xx", 'topic_name' => 'xxx', 'consumer_id' => 'xxx' ];パラメーター
説明
sasl_plain_username
Simple Authentication and Security Layer(SASL)ユーザーのユーザー名。デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。詳細については、「SASL ユーザーに権限を付与する」をご参照ください。
sasl_plain_password
SASL ユーザーのパスワード。デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合、このパラメーターは使用できません。
bootstrap_servers
ApsaraMQ for KafkaインスタンスのSSLエンドポイント。アクセスポイント情報 セクションの インスタンスの詳細 ページ (ApsaraMQ for Kafka コンソール) でエンドポイントを取得できます。
topic_name
トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピック名を取得できます。
consumer_id
グループ ID。 Group の管理 ページ (ApsaraMQ for Kafka コンソール) でグループ ID を取得できます。
必要なパラメーターを構成した後、構成ファイルが配置されているフォルダー内のすべてのファイルを、サーバー上の PHP インストールディレクトリにアップロードします。SSL エンドポイントに対応するフォルダーには、SSL ルート証明書ファイルが含まれています。
メッセージの送信
次のコマンドを実行して kafka-producer.php を実行し、メッセージを送信します。
php kafka-producer.php次のサンプルコードは、kafka-producer.php の例を示しています。
サンプルコードでは、SSL エンドポイントが使用されています。デフォルトエンドポイントを使用する場合は、SASL 関連のコードは必要ありません。sasl. または ssl. を含む行をサンプルコードから削除してください。
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# デバッグを行う場合は、ログレベルを LOG_DEBUG に設定します
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
echo "send succ" . PHP_EOL;サンプルコードの詳細については、「php-rdkafka」をご参照ください。
メッセージの購読
次のコマンドを実行して kafka-consumer.php を実行し、メッセージを購読します。
php kafka-consumer.php次のサンプルコードは、kafka-consumer.php の例を示しています。
サンプルコードでは、SSL エンドポイントが使用されています。デフォルトエンドポイントを使用する場合は、SASL 関連のコードは必要ありません。sasl. または ssl. を含む行をサンプルコードから削除してください。
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('session.timeout.ms', 10000);
$conf->set('request.timeout.ms', 305000);
$conf->set('group.id', $setting['consumer_id']);
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('metadata.broker.list', $setting['bootstrap_servers']);
$topicConf = new RdKafka\TopicConf();
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$setting['topic_name']]);
echo "パーティション割り当てを待機しています... (終了後に\n";
echo "すぐにグループに再参加する場合、時間がかかる場合があります。)\n";
while (true) {
$message = $consumer->consume(30 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "これ以上のメッセージはありません。さらに待機します\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "タイムアウトしました\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>サンプルコードの詳細については、「php-rdkafka」をご参照ください。