このトピックでは、C++ SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。
環境要件
GNU Compiler Collection(GCC)がインストールされていること。詳細については、Installing GCC をご参照ください。
C++ ライブラリのインストール
次のコマンドを実行して、/etc/yum.repos.d/ yum リポジトリディレクトリに切り替えます。
cd /etc/yum.repos.d/confluent.repo という名前の yum リポジトリ構成ファイルを作成します。
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1次のコマンドを実行して、C++ ライブラリをインストールします。
yum install librdkafka-devel
構成ファイルの準備
(オプション) Secure Sockets Layer(SSL)エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次の操作を実行する必要があります。
次のコマンドを実行して、SSLライブラリをインストールします。
yum install openssl openssl-devel次のコマンドを実行して、Simple Authentication and Security Layer(SASL)ライブラリをインストールします。
yum install cyrus-sasl{,-plain}
aliware-kafka-demos ページに移動します。
アイコンをクリックし、[ZIP をダウンロード] を選択してデモプロジェクトをダウンロードします。次に、デモプロジェクトのパッケージを解凍します。解凍したパッケージで、kafka-cpp-demo フォルダに移動します。次に、使用するエンドポイントに基づいて対応するフォルダを開き、フォルダ内のすべてのファイルをサーバーにアップロードします。SSLエンドポイントに対応するフォルダには、SSLルート証明書ファイルが含まれています。
メッセージを送信する
kafka_producer.c をコンパイルしてメッセージを送信するには、次の操作を実行します。
次のコマンドを実行して、kafka_producer.c をコンパイルします。
gcc -lrdkafka ./kafka_producer.c -o kafka_producer次のいずれかのコマンドを実行して、メッセージを送信します。
デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコマンドを実行します。
./kafka_producer <bootstrap_servers> <topic>SSLエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコマンドを実行します。
./kafka_producer <bootstrap_servers> <topic> <username> <password>
パラメータ
説明
bootstrap_servers
インスタンスエンドポイント。アクセスポイント情報 セクションの インスタンスの詳細 ページ (ApsaraMQ for Kafka コンソール) でエンドポイントを取得できます。
topic
トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール内) でトピック名を取得できます。
username
SASLユーザーのユーザー名。SSLエンドポイントを使用してインスタンスに接続する場合は、このパラメータが必要です。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメータと パラメータから、SASLユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用して SASL ユーザーがメッセージを送受信する権限を持っていることを確認してください。詳細については、SASL ユーザーに権限を付与する をご参照ください。
password
SASLユーザーのパスワード。SSLエンドポイントを使用してインスタンスに接続する場合は、このパラメータが必要です。
次のサンプルコードは、kafka_producer.c の例を示しています。
サンプルコードを使用する前に、使用するエンドポイントとコード内の説明に基づいて、対応するコードを選択、コメントアウト、または削除してください。
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2017, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* シンプルな Apache Kafka プロデューサー
* librdkafka の Kafka ドライバーを使用
* (https://github.com/edenhill/librdkafka)
*/
#include <stdio.h>
#include <signal.h>
#include <string.h>
/* 通常のインクルードパスは <librdkafka/rdkafka.h> ですが、このプログラムは
* librdkafka ソースツリー内から組み込まれているため、異なります。 */
#include "librdkafka/rdkafka.h"
static int run = 1;
/*
* プログラムの終了シグナル
*/
static void stop (int sig) {
run = 0;
fclose(stdin); /* fgets() を中止 */
}
/*
* メッセージ配信レポートコールバック。
*
* このコールバックは、メッセージごとに 1 回だけ呼び出され、
* メッセージが正常に配信されたかどうかを示します。
* (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) または永続的に
* 配信に失敗しました (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)。
*
* コールバックは rd_kafka_poll() からトリガーされ、
* アプリケーションのスレッドで実行されます。
*/
static void dr_msg_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% メッセージ配信に失敗しました: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stderr,
"%% メッセージが配信されました (%zd バイト, "
"パーティション %"PRId32")\n",
rkmessage->len, rkmessage->partition);
/* rkmessage は librdkafka によって自動的に破棄されます */
}
int main (int argc, char **argv) {
rd_kafka_t *rk; /* プロデューサーインスタンスハンドル */
rd_kafka_topic_t *rkt; /* トピックオブジェクト */
rd_kafka_conf_t *conf; /* 一時構成オブジェクト */
char errstr[512]; /* librdkafka API エラーレポートバッファ */
char buf[512]; /* メッセージ値一時バッファ */
const char *brokers; /* 引数: ブローカーリスト */
const char *topic; /* 引数: 生成するトピック */
const char *username; /* 引数: 生成するトピック */
const char *password; /* 引数: 生成するトピック */
/*
* 引数の検証
*/
// ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントのタイプに基づいて、IF ステートメントを選択します。
// SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコードを使用します。
if (argc != 5) {
fprintf(stderr, "%% 使用方法: %s <broker> <topic> <username> <password>\n", argv[0]);
return 1;
}
// デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコードを使用します。
if (argc != 3) {
fprintf(stderr, "%% 使用方法: %s <broker> <topic>\n", argv[0]);
return 1;
}
brokers = argv[1];
topic = argv[2];
username = argv[3]; // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、この行をコメントアウトまたは削除します。
password = argv[4]; // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、この行をコメントアウトまたは削除します。
/*
* Kafka クライアント構成プレースホルダーの作成
*/
conf = rd_kafka_conf_new();
/* ブートストラップブローカーを、ホストまたはホスト:ポート (デフォルトポート 9092) のカンマ区切りリストとして設定します。
* librdkafka は、ブートストラップブローカーを使用して、クラスターからブローカーの完全なセットを取得します。 */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
// デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次の IF ステートメントをコメントアウトまたは削除します。
if (rd_kafka_conf_set(conf, "ssl.ca.location", "ca-cert.pem", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "security.protocol", "sasl_ssl", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "ssl.endpoint.identification.algorithm", "None", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
) {
fprintf(stderr, "%s\n", errstr);
return -1;
}
/* 配信レポートコールバックを設定します。
* このコールバックは、メッセージごとに 1 回呼び出され、
* 配信が成功したか失敗したかをアプリケーションに通知します。
* 上記の dr_msg_cb() を参照してください。 */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
/*
* プロデューサーインスタンスを作成します。
*
* 注: rd_kafka_new() は conf オブジェクトの所有権を取得します。
* アプリケーションはこの呼び出しの後で再度参照してはなりません。
*/
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr,
"%% 新しいプロデューサーの作成に失敗しました: %s\n", errstr);
return 1;
}
/* 生成される各メッセージで再利用されるトピックオブジェクトを作成します。
*
* プロデューサーインスタンス (rd_kafka_t) とトピックオブジェクト (topic_t) の両方
* は、できるだけ再利用する必要がある長寿命オブジェクトです。
*/
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt) {
fprintf(stderr, "%% トピックオブジェクトの作成に失敗しました: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
/* クリーンシャットダウンのシグナルハンドラ */
signal(SIGINT, stop);
fprintf(stderr,
"%% テキストを入力して Enter キーを押してメッセージを生成します\n"
"%% または、Enter キーを押すだけで配信レポートを提供します\n"
"%% Ctrl-C または Ctrl-D を押して終了します\n");
while (run && fgets(buf, sizeof(buf), stdin)) {
size_t len = strlen(buf);
if (buf[len-1] == '\n') /* 改行を削除 */
buf[--len] = '\0';
if (len == 0) {
/* 空行: 配信レポートのみを提供 */
rd_kafka_poll(rk, 0/*非ブロッキング */);
continue;
}
/*
* メッセージを送信/生成します。
* これは非同期呼び出しであり、成功すると、メッセージが内部プロデューサーキューにエンキューされるだけです。
* ブローカーへの実際の配信試行は、バックグラウンドスレッドによって処理されます。
* 以前に登録された配信レポートコールバック
* (dr_msg_cb) は、メッセージが配信された (または失敗した) ときにアプリケーションにシグナルを送り返すために使用されます。
*/
retry:
if (rd_kafka_produce(
/* トピックオブジェクト */
rkt,
/* 組み込みパーティショナーを使用してパーティションを選択します */
RD_KAFKA_PARTITION_UA,
/* ペイロードのコピーを作成します。 */
RD_KAFKA_MSG_F_COPY,
/* メッセージペイロード (値) と長さ */
buf, len,
/* オプションのキーとその長さ */
NULL, 0,
/* メッセージの不透明度。
* 配信レポートコールバックで
* msg_opaque として提供されます。 */
NULL) == -1) {
/*
* 生成するメッセージの *エンキュー* に失敗しました。
*/
fprintf(stderr,
"%% トピック %s への生成に失敗しました: %s\n",
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error()));
/* 配信レポートを処理するためにポーリングします */
if (rd_kafka_last_error() ==
RD_KAFKA_RESP_ERR__QUEUE_FULL) {
/* 内部キューがいっぱいの場合は、
* メッセージが配信されるまで待ってから再試行します。
* 内部キューは、送信されるメッセージと、
* 送信済みまたは失敗したメッセージの両方を表し、
* 配信レポートコールバックが呼び出されるのを待っています。
*
* 内部キューは、構成プロパティ
* queue.buffering.max.messages によって制限されます */
rd_kafka_poll(rk, 1000/*最大 1000ms ブロック*/);
goto retry;
}
} else {
fprintf(stderr, "%% トピック %s のメッセージ (%zd バイト) "
"がエンキューされました\n",
len, rd_kafka_topic_name(rkt));
}
/* プロデューサーアプリケーションは、rd_kafka_poll() を呼び出すことによって、
* 配信レポートキューを頻繁に提供する必要があります。
* poll 呼び出しをメインループまたは専用スレッドに配置するか、
* rd_kafka_produce() 呼び出しのたびに呼び出します。
* メッセージを生成していない期間でも rd_kafka_poll() が呼び出されるようにしてください。
* 以前に生成されたメッセージの配信レポートコールバックが提供されていることを確認するため (および登録したその他のコールバック)。 */
rd_kafka_poll(rk, 0/*非ブロッキング*/);
}
/* 最後のメッセージが配信されるか失敗するのを待ちます。
* rd_kafka_flush() は rd_kafka_poll() の抽象化であり、
* すべてのメッセージが配信されるまで待ちます。 */
fprintf(stderr, "%% 最後のメッセージをフラッシュしています..\n");
rd_kafka_flush(rk, 10*1000 /* 最大 10 秒待機 */);
/* トピックオブジェクトを破棄 */
rd_kafka_topic_destroy(rkt);
/* プロデューサーインスタンスを破棄 */
rd_kafka_destroy(rk);
return 0;
}メッセージを購読する
kafka_consumer.c をコンパイルしてメッセージを購読するには、次の操作を実行します。
次のコマンドを実行して、kafka_consumer.c をコンパイルします。
gcc -lrdkafka ./kafka_consumer.c -o kafka_consumerエンドポイントタイプに基づいて、次のいずれかのコマンドを実行してメッセージを購読します。
デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコマンドを実行します。
./kafka_consumer -g <group> -b <bootstrap_servers> <topic>SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次のコマンドを実行します。
./kafka_consumer -g <group> -b <bootstrap_servers> -u <username> -p <password> <topic>パラメータ
説明
group
グループ ID。 Group の管理 ページ(ApsaraMQ for Kafka コンソール内)でグループ ID を取得できます。
bootstrap_servers
ApsaraMQ for KafkaインスタンスのSSLエンドポイント。アクセスポイント情報 セクションの インスタンスの詳細 ページ (ApsaraMQ for Kafka コンソール) でエンドポイントを取得できます。
username
SASL ユーザーのユーザー名。SSL エンドポイントを使用してインスタンスに接続する場合は、このパラメータが必要です。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメータと パラメータから、SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用して SASL ユーザーがメッセージを送受信する権限を持っていることを確認してください。詳細については、SASL ユーザーに権限を付与する をご参照ください。
password
SASL ユーザーのパスワード。SSL エンドポイントを使用してインスタンスに接続する場合は、このパラメータが必要です。
topic
トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール内) でトピック名を取得できます。
次のサンプルコードは、kafka_consumer.c の例を示しています。
サンプルコードを使用する前に、使用するエンドポイントとコード内の説明に基づいて、対応するコードを選択、コメントアウト、または削除してください。
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Apache Kafka 高レベルコンシューマーのサンプルプログラム
* librdkafka の Kafka ドライバーを使用
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
#include <getopt.h>
/* 通常のインクルードパスは <librdkafka/rdkafka.h> ですが、このプログラムは
* librdkafka ソースツリー内から組み込まれているため、異なります。 */
#include "librdkafka/rdkafka.h" /* Kafka ドライバー用 */
static int run = 1;
static rd_kafka_t *rk;
static int exit_eof = 0;
static int wait_eof = 0; /* EOF を待っているパーティションの数 */
static int quiet = 0;
static enum {
OUTPUT_HEXDUMP,
OUTPUT_RAW,
} output = OUTPUT_HEXDUMP;
static void stop (int sig) {
if (!run)
exit(1);
run = 0;
fclose(stdin); /* fgets() を中止 */
}
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
const char *p = (const char *)ptr;
unsigned int of = 0;
if (name)
fprintf(fp, "%s hexdump (%zd バイト):\n", name, len);
for (of = 0 ; of < len ; of += 16) {
char hexen[16*3+1];
char charen[16+1];
int hof = 0;
int cof = 0;
int i;
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
cof += sprintf(charen+cof, "%c",
isprint((int)p[i]) ? p[i] : '.');
}
fprintf(fp, "%08x: %-48s %-16s\n",
of, hexen, charen);
}
}
/*
* Kafka ロガーコールバック (オプション)
*/
static void logger (const rd_kafka_t *rk, int level,
const char *fac, const char *buf) {
struct timeval tv;
gettimeofday(&tv, NULL);
fprintf(stdout, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
level, fac, rd_kafka_name(rk), buf);
}
/*
* コンシュームされたメッセージを処理して出力します。
* 内部的に作成されたメッセージは、librdkafka からアプリケーションに状態を伝達するためにも使用されます。アプリケーションは、
* この目的のために `rkmessage->err` フィールドを確認する必要があります。
*/
static void msg_consume (rd_kafka_message_t *rkmessage) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% コンシューマーは %s [%"PRId32"] "
"メッセージキューの末尾に到達しました (オフセット %"PRId64")\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
if (exit_eof && --wait_eof == 0) {
fprintf(stderr,
"%% すべてのパーティションが EOF に到達しました: "
"終了しています\n");
run = 0;
}
return;
}
if (rkmessage->rkt)
fprintf(stderr, "%% トピック \"%s\" [%"PRId32"] "
"オフセット %"PRId64" のコンシュームエラー: %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
else
fprintf(stderr, "%% コンシューマーエラー: %s: %s\n",
rd_kafka_err2str(rkmessage->err),
rd_kafka_message_errstr(rkmessage));
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
run = 0;
return;
}
if (!quiet)
fprintf(stdout, "%% メッセージ (トピック %s [%"PRId32"], "
"オフセット %"PRId64", %zd バイト):\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, rkmessage->len);
if (rkmessage->key_len) {
if (output == OUTPUT_HEXDUMP)
hexdump(stdout, "メッセージキー",
rkmessage->key, rkmessage->key_len);
else
printf("キー: %.*s\n",
(int)rkmessage->key_len, (char *)rkmessage->key);
}
if (output == OUTPUT_HEXDUMP)
hexdump(stdout, "メッセージペイロード",
rkmessage->payload, rkmessage->len);
else
printf("%.*s\n",
(int)rkmessage->len, (char *)rkmessage->payload);
}
static void print_partition_list (FILE *fp,
const rd_kafka_topic_partition_list_t
*partitions) {
int i;
for (i = 0 ; i < partitions->cnt ; i++) {
fprintf(stderr, "%s %s [%"PRId32"] オフセット %"PRId64,
i > 0 ? ",":"",
partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(stderr, "\n");
}
static void rebalance_cb (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
fprintf(stderr, "%% コンシューマーグループが再調整されました: ");
switch (err)
{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "割り当て済み:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, partitions);
wait_eof += partitions->cnt;
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "取り消し済み:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, NULL);
wait_eof = 0;
break;
default:
fprintf(stderr, "失敗しました: %s\n",
rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}
}
static int describe_groups (rd_kafka_t *rk, const char *group) {
rd_kafka_resp_err_t err;
const struct rd_kafka_group_list *grplist;
int i;
err = rd_kafka_list_groups(rk, group, &grplist, 10000);
if (err) {
fprintf(stderr, "%% グループリストの取得に失敗しました: %s\n",
rd_kafka_err2str(err));
return -1;
}
for (i = 0 ; i < grplist->group_cnt ; i++) {
const struct rd_kafka_group_info *gi = &grplist->groups[i];
int j;
printf("グループ \"%s\" は、ブローカー %d (%s:%d) で状態 %s です\n",
gi->group, gi->state,
gi->broker.id, gi->broker.host, gi->broker.port);
if (gi->err)
printf(" エラー: %s\n", rd_kafka_err2str(gi->err));
printf(" プロトコルタイプ \"%s\"、プロトコル \"%s\"、"
"%d メンバーで:\n",
gi->protocol_type, gi->protocol, gi->member_cnt);
for (j = 0 ; j < gi->member_cnt ; j++) {
const struct rd_kafka_group_member_info *mi;
mi = &gi->members[j];
printf(" \"%s\"、クライアント ID \"%s\" (ホスト %s)\n",
mi->member_id, mi->client_id, mi->client_host);
printf(" メタデータ: %d バイト\n",
mi->member_metadata_size);
printf(" 割り当て: %d バイト\n",
mi->member_assignment_size);
}
printf("\n");
}
if (group && !grplist->group_cnt)
fprintf(stderr, "%% 一致するグループがありません (%s)\n", group);
rd_kafka_group_list_destroy(grplist);
return 0;
}
static void sig_usr1 (int sig) {
rd_kafka_dump(stdout, rk);
}
int main (int argc, char **argv) {
char mode = 'C';
char *brokers = "localhost:9092";
char *username = "123";
char *password = "123";
int opt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
const char *debug = NULL;
int do_conf_dump = 0;
char tmp[16];
rd_kafka_resp_err_t err;
char *group = NULL;
rd_kafka_topic_partition_list_t *topics;
int is_subscription;
int i;
quiet = !isatty(STDIN_FILENO);
/* Kafka 構成 */
conf = rd_kafka_conf_new();
/* ロガーを設定 */
rd_kafka_conf_set_log_cb(conf, logger);
/* 迅速な終了 */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
/* トピック構成 */
topic_conf = rd_kafka_topic_conf_new();
while ((opt = getopt(argc, argv, "g:b:qd:eX:ADO")) != -1) { // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このステートメントを使用します。
while ((opt = getopt(argc, argv, "g:b:u:p:qd:eX:ADO")) != -1) { // SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このステートメントを使用します。
switch (opt) {
case 'b':
brokers = optarg;
break;
case 'g':
group = optarg;
break;
// デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、ユーザー名とパスワードを指定するために使用される次の 6 行をコメントアウトまたは削除します。
case 'u':
username = optarg;
break;
case 'p':
password = optarg;
break;
case 'e':
exit_eof = 1;
break;
case 'd':
debug = optarg;
break;
case 'q':
quiet = 1;
break;
case 'A':
output = OUTPUT_RAW;
break;
case 'X':
{
char *name, *val;
rd_kafka_conf_res_t res;
if (!strcmp(optarg, "list") ||
!strcmp(optarg, "help")) {
rd_kafka_conf_properties_show(stdout);
exit(0);
}
if (!strcmp(optarg, "dump")) {
do_conf_dump = 1;
continue;
}
name = optarg;
if (!(val = strchr(name, '='))) {
fprintf(stderr, "%% -X property=value が想定されていましたが、%s です\n", name);
exit(1);
}
*val = '\0';
val++;
res = RD_KAFKA_CONF_UNKNOWN;
/* まずトピック設定で「topic.」という接頭辞が付いたプロパティを試してから、
* トピック設定プロパティと一致しなかった場合はグローバルにフォールスルーします。 */
if (!strncmp(name, "topic.", strlen("topic.")))
res = rd_kafka_topic_conf_set(topic_conf,
name+
strlen("topic."),
val,
errstr,
sizeof(errstr));
if (res == RD_KAFKA_CONF_UNKNOWN)
res = rd_kafka_conf_set(conf, name, val,
errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
}
break;
case 'D':
case 'O':
mode = opt;
break;
default:
goto usage;
}
}
if (do_conf_dump) {
const char **arr;
size_t cnt;
int pass;
for (pass = 0 ; pass < 2 ; pass++) {
if (pass == 0) {
arr = rd_kafka_conf_dump(conf, &cnt);
printf("# グローバル設定\n");
} else {
printf("# トピック設定\n");
arr = rd_kafka_topic_conf_dump(topic_conf,
&cnt);
}
for (i = 0 ; i < (int)cnt ; i += 2)
printf("%s = %s\n",
arr[i], arr[i+1]);
printf("\n");
rd_kafka_conf_dump_free(arr, cnt);
}
exit(0);
}
if (strchr("OC", mode) && optind == argc) {
usage:
fprintf(stderr,
"使用方法: %s [オプション] <topic[:part]> <topic[:part]>..\n"
"\n"
"librdkafka バージョン %s (0x%08x)\n"
"\n"
" オプション:\n"
" -g <group> コンシューマーグループ (%s)\n"
" -b <brokers> ブローカーアドレス (%s)\n"
" -u <username> sasl プレーンユーザー名 (%s)\n" // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、この行をコメントアウトまたは削除します。
" -p <password> sasl プレーンパスワード (%s)\n" // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、この行をコメントアウトまたは削除します。
" -e パーティションの最後のメッセージが\n"
" 受信されたときにコンシューマーを終了します。\n"
" -D グループを記述します。\n"
" -O コミットされたオフセットを取得します。\n"
" -d [facs..] デバッグコンテキストを有効にします:\n"
" %s\n"
" -q サイレントにします\n"
" -A 生のペイロード出力 (コンシューマー)\n"
" -X <prop=name> 任意の librdkafka "
"構成プロパティを設定します\n"
" \"topic.\" という接頭辞が付いたプロパティは、"
"トピックオブジェクトに設定されます。\n"
" サポートされているプロパティの完全なリストを"
"表示するには、'-X list' を使用します。\n"
"\n"
"調整されたコンシューマーグループの場合は、「topic1 topic2..」"
"形式を使用し、静的割り当ての場合は、"
"「topic1:part1 topic1:part2 topic2:part1..」を使用します。\n"
"\n",
argv[0],
rd_kafka_version_str(), rd_kafka_version(),
group, brokers, username, password, // デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、この行の「username, password,」を削除します。
RD_KAFKA_DEBUG_CONTEXTS);
exit(1);
}
signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1);
if (debug &&
rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% デバッグ構成に失敗しました: %s: %s\n",
errstr, debug);
exit(1);
}
/*
* クライアント/コンシューマーグループ
*/
if (strchr("CO", mode)) {
/* コンシューマーグループにはグループ ID が必要です */
if (!group)
group = "rdkafka_consumer_example";
if (rd_kafka_conf_set(conf, "group.id", group,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* コンシューマーグループは常にブローカーベースのオフセットストレージを使用します */
if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
"broker",
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* パターン一致トピックのデフォルトトピック設定を設定します。 */
rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
/* パーティション割り当ての変更時に呼び出されるコールバック */
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rd_kafka_conf_set(conf, "enable.partition.eof", "true",
NULL, 0);
}
// デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、次の IF ステートメントをコメントアウトまたは削除します。
if (rd_kafka_conf_set(conf, "ssl.ca.location", "ca-cert.pem", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "security.protocol", "sasl_ssl", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "ssl.endpoint.identification.algorithm", "None", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
|| rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK
) {
fprintf(stderr, "%s\n", errstr);
return -1;
}
/* Kafka ハンドルを作成します */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% 新しいコンシューマーの作成に失敗しました: %s\n",
errstr);
exit(1);
}
/* ブローカーを追加します */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% 有効なブローカーが指定されていません\n");
exit(1);
}
if (mode == 'D') {
int r;
/* グループを記述します */
r = describe_groups(rk, group);
rd_kafka_destroy(rk);
exit(r == -1 ? 1 : 0);
}
/* rd_kafka_poll() を consumer_poll() にリダイレクトします */
rd_kafka_poll_set_consumer(rk);
topics = rd_kafka_topic_partition_list_new(argc - optind);
is_subscription = 1;
for (i = optind ; i < argc ; i++) {
/* "topic[:part]" を解析します */
char *topic = argv[i];
char *t;
int32_t partition = -1;
if ((t = strstr(topic, ":"))) {
*t = '\0';
partition = atoi(t+1);
is_subscription = 0; /* 割り当てです */
wait_eof++;
}
rd_kafka_topic_partition_list_add(topics, topic, partition);
}
if (mode == 'O') {
/* オフセットクエリ */
err = rd_kafka_committed(rk, topics, 5000);
if (err) {
fprintf(stderr, "%% オフセットの取得に失敗しました: %s\n",
rd_kafka_err2str(err));
exit(1);
}
for (i = 0 ; i < topics->cnt ; i++) {
rd_kafka_topic_partition_t *p = &topics->elems[i];
printf("トピック \"%s\" パーティション %"PRId32,
p->topic, p->partition);
if (p->err)
printf(" エラー %s",
rd_kafka_err2str(p->err));
else {
printf(" オフセット %"PRId64"",
p->offset);
if (p->metadata_size)
printf(" (%d バイトのメタデータ)",
(int)p->metadata_size);
}
printf("\n");
}
goto done;
}
if (is_subscription) {
fprintf(stderr, "%% %d 個のトピックを購読しています\n", topics->cnt);
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr,
"%% トピックのコンシュームを開始できませんでした: %s\n",
rd_kafka_err2str(err));
exit(1);
}
} else {
fprintf(stderr, "%% %d 個のパーティションを割り当てています\n", topics->cnt);
if ((err = rd_kafka_assign(rk, topics))) {
fprintf(stderr,
"%% パーティションの割り当てに失敗しました: %s\n",
rd_kafka_err2str(err));
}
}
while (run) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consumer_poll(rk, 1000);
if (rkmessage) {
msg_consume(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
done:
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% コンシューマーを閉じることができませんでした: %s\n",
rd_kafka_err2str(err));
else
fprintf(stderr, "%% コンシューマーが閉じられました\n");
rd_kafka_topic_partition_list_destroy(topics);
/* ハンドルを破棄します */
rd_kafka_destroy(rk);
/* バックグラウンドスレッドがクリーンアップして正常に終了するまで待ちます。 */
run = 5;
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
printf("librdkafka の使用停止を待機しています\n");
if (run <= 0)
rd_kafka_dump(stdout, rk);
return 0;
}