このトピックでは、EventBridge コンソールでイベントストリームのイベントソースとして ApsaraMQ for Kafka を設定する方法について説明します。
前提条件
ApsaraMQ for Kafka インスタンスを購入してデプロイし、インスタンスが [サービス中] 状態であることを確認します。 詳細については、「インスタンスの購入とデプロイ」をご参照ください。
手順
EventBridge コンソールにログインします。 左側のナビゲーションウィンドウで、[イベントストリーム] をクリックします。
上部のナビゲーションバーでリージョンを選択し、[イベントストリームの作成] をクリックします。
[イベントストリームの作成] パネルで、[タスク名] と [説明] を設定し、次のパラメーターを設定してから、[保存] をクリックします。
タスクの作成
[ソース] 設定ウィザードで、[データプロバイダー] を [ApsaraMQ For Kafka] に設定し、次のパラメーターを設定してから、[次へ] をクリックします。
[フィルタリング]、[変換]、および [シンク] 設定ウィザードで、イベントフィルタリングルール、変換ルール、およびイベントターゲットを設定します。 イベント変換の設定方法の詳細については、「Function Compute を使用したデータクレンジング」をご参照ください。
パラメーター
説明
例
リージョン
ソース ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
中国 (北京)
Kafka インスタンス
ApsaraMQ for Kafka メッセージを生成するソースインスタンスを選択します。
MQ_INST_115964845466****_ByBeUp3p
Topic
ApsaraMQ for Kafka メッセージを生成する Topic を選択します。
topic
グループ ID
ソースインスタンスのコンシューマーグループの名前を選択します。 専用のコンシューマーグループを使用してイベントソースを作成します。 既存のサービスとコンシューマーグループを共有しないでください。 これにより、既存のメッセージの送受信との干渉を防ぎます。
GID_http_1
コンシューマーオフセット
メッセージの消費を開始するオフセットを選択します。
最新のオフセット
ネットワークタイプ
メッセージルーティングのネットワークタイプを選択します。
デフォルトネットワーク
VPC
VPC ID を選択します。 このパラメーターは、[ネットワークタイプ] を [インターネット] に設定した場合に必要です。
vpc-bp17fapfdj0dwzjkd****
VSwitch
vSwitch ID を選択します。 このパラメーターは、[ネットワークタイプ] を [インターネット] に設定した場合に必要です。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ
セキュリティグループを選択します。 このパラメーターは、[ネットワークタイプ] を [インターネット] に設定した場合に必要です。
alikafka_pre-cn-7mz2****
バッチプッシュ
バッチプッシュは複数のイベントを集約します。 バッチプッシュは、一括プッシュの件数 または バッチプッシュ間隔 (単位:秒) の値に達したときにトリガーされます。
たとえば、プッシュ数を 100、間隔を 15 秒に設定したとします。 メッセージ数が 10 秒で 100 に達した場合、15 秒の間隔が経過するのを待たずにプッシュがすぐにトリガーされます。
有効化
一括プッシュの件数
1 回の関数呼び出しで送信できるメッセージの最大数。 リクエストは、蓄積されたメッセージの数が指定された値に達した場合にのみ送信されます。 有効な値: [1, 10000]。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される間隔。 システムはメッセージを集約し、指定された間隔で Function Compute に送信します。 有効な値: [0, 15]。 単位: 秒。 値 0 は待機時間がないことを示し、メッセージはすぐに配信されます。
3
タスクのプロパティ
イベントストリームのリトライポリシーとデッドレターキューを設定します。 詳細については、「リトライとデッドレターキュー」をご参照ください。
[イベントストリーム] ページに戻ります。 作成したイベントストリームを見つけ、[アクション] 列の [有効化] をクリックします。
イベントストリームを有効にすると、開始までに 30〜60 秒かかる場合があります。 [イベントストリーム] ページの [ステータス] 列で起動の進行状況を確認できます。
イベントの例
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}詳細については、「イベントの概要」をご参照ください。
次の表に、data フィールドのパラメーターを示します。
パラメーター | タイプ | 例 | 説明 |
topic | 文字列 | TopicName | Topic の名前。 |
partition | Int | 1 | ApsaraMQ for Kafka のコンシューマーパーティション。 |
offset | Int | 0 | ApsaraMQ for Kafka のメッセージオフセット。 |
timestamp | 文字列 | 1655952591589 | メッセージの消費が開始されるときの UNIX タイムスタンプ。 |
headers.headers | リスト | [header1, header2] | メッセージヘッダー。 |
headers.isReadOnly | ブール値 | false | このフィールドは予約済みであり、実用的な意味はありません。 |
key | 文字列 | dataKey | メッセージキー。 |
value | 文字列 | dataValue | メッセージの値。 コンテンツの形式は、タスクに設定されているデータ形式によって異なります。
|