このトピックでは、ApsaraMQ for Kafka コンソールでソースコネクタを作成し、ApsaraMQ for RocketMQ から ApsaraMQ for Kafka にデータを同期する方法について説明します。
前提条件
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。サービスを利用できます 状態であることを確認してください。詳細については、「ステップ 2: インスタンスを購入してデプロイする」をご参照ください。
ApsaraMQ for RocketMQ インスタンスが購入され、デプロイされていること。[実行中] 状態であることを確認してください。詳細については、「ステップ 2: リソースを作成する」をご参照ください。
EventBridge がアクティブ化されており、必要な権限が RAM (Resource Access Management) ユーザーに付与されていること。詳細については、「EventBridge をアクティブ化し、RAM ユーザーに権限を付与する」をご参照ください。
ソースコネクタを作成する
ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、Task Name パラメーターと Description パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Rocketmq] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
リージョン
ApsaraMQ for RocketMQ インスタンスが存在するリージョン。
中国 (杭州)
バージョン
ApsaraMQ for RocketMQ インスタンスのバージョン。このインスタンスにメッセージをルーティングします。有効な値:
RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x。
RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x。
RocketMQ 5.x
RocketMQ インスタンス
ルーティングするメッセージが生成される ApsaraMQ for RocketMQ インスタンス。
rmq-cn-****
Topic
ルーティングするメッセージが生成される ApsaraMQ for RocketMQ インスタンス上の Topic。
topic
Tag
ApsaraMQ for RocketMQ インスタンス内のメッセージをフィルタリングするために使用されるタグ。
test_tag
グループ ID
ApsaraMQ for RocketMQ インスタンス上のコンシューマーグループの名前。有効な値:
クイック作成: システムは GID_EVENTBRIDGE_xxx 形式で名前が付けられたコンシューマーグループを自動的に作成します。この値を選択することをお勧めします。
既存のグループを使用: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの送受信に影響します。
クイック作成
コンシューマーオフセット
最新のオフセット: メッセージは最新のオフセットから消費されます。
最も古いオフセット: メッセージは最も古いオフセットから消費されます。
タイムスタンプ: メッセージは特定のタイムスタンプから消費されます。
最新のオフセット
消費日時
メッセージの消費を開始する日時。コンシューマーオフセット パラメーターを タイムスタンプ に設定した場合にのみ、このパラメーターが必要です。
2024-06-18 15:28:29
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコーディングに特別な要件がない場合は、値として Json を指定します。
Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。
テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数を呼び出す時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディタでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
例
インスタンス ID
作成した ApsaraMQ for Kafka インスタンスの ID。
test
Topic
作成した ApsaraMQ for Kafka インスタンス上の Topic。
test
確認モード (ACK)
メッセージを受信した後、ApsaraMQ for Kafka インスタンスがクライアントに確認応答 (ACK) を送信するモード。有効な値:
なし
リーダーのみ
すべて
なし
メッセージ本文 (値)
EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをターゲットにルーティングします。
完全なデータ
データ抽出
固定値
テンプレート
データ抽出
$.data.value
メッセージキー
EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをターゲットにルーティングします。
空
データ抽出
固定値
テンプレート
データ抽出
$.data.key
タスクのプロパティ
イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーとデッドレターキュー」をご参照ください。
タスクリスト ページに戻り、作成した ApsaraMQ for RocketMQ ソースコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
ソースコネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進捗状況を確認できます。
その他の操作
タスクリスト ページで、管理するソースコネクタを見つけ、操作する 列でその他の操作を実行します。
コネクタの詳細を表示する: [操作] 列の 詳細 をクリックします。[タスクの詳細] ページで、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示します。
コネクタの構成を変更する: [操作] 列の 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更します。
コネクタを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。
コネクタを削除する: [操作] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。