このトピックでは、ApsaraMQ for Kafka コンソールで、ApsaraMQ for RocketMQ から ApsaraMQ for Kafka にデータを同期するためのソースコネクタを作成する方法について説明します。
前提条件
ApsaraMQ for Kafka インスタンスが作成され、サービスを利用できます 状態であること。 詳細については、「ステップ 2: インスタンスの購入とデプロイ」をご参照ください。
ApsaraMQ for RocketMQ インスタンスが作成され、[サービス中] 状態であること。 詳細については、「ステップ 2: リソースの作成」をご参照ください。
ソースコネクタの作成
ApsaraMQ for Kafka コンソールにログインします。概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 パネルで、Task Name と Description を指定します。次に、次のパラメーターを設定し、[保存] をクリックします。
タスクの作成
設定ウィザードの Source (ソース) ステップで、データプロバイダー を [ApsaraMQ For RocketMQ] に設定し、次のパラメーターを設定して、[次へ] をクリックします。
パラメーター
説明
例
リージョン
ソース ApsaraMQ for RocketMQ インスタンスが配置されているリージョン。
中国 (杭州)
バージョン
ApsaraMQ for RocketMQ インスタンスのバージョン。
RocketMQ 4.x: サーバーバージョン 4.x。
RocketMQ 5.x: サーバーバージョン 5.x。
RocketMQ 5.x
RocketMQ インスタンス
メッセージを生成するソース ApsaraMQ for RocketMQ インスタンス。
rmq-cn-****
Topic
メッセージを生成するソース Topic。
topic
Tag
ApsaraMQ for RocketMQ でメッセージをフィルターするために使用されるタグ。
test_tag
グループ ID
ApsaraMQ for RocketMQ の使用者グループの名前。
クイック作成: 推奨。使用者グループの ID は、GID_EVENTBRIDGE_xxx 形式で自動的に作成されます。
既存のものを使用: 独立したグループ ID を選択します。他のサービスですでに使用されているグループ ID は使用しないでください。これにより、既存のメッセージの送受信との干渉を防ぎます。
クイック作成
使用者オフセット
最新のオフセット: 最新のオフセットからメッセージの使用を開始します。
最も早いオフセット: 最も早いオフセットからメッセージの使用を開始します。
タイムスタンプを指定: 特定の時点からメッセージの使用を開始します。
最新のオフセット
消費日時
消費チェックポイント。このパラメーターは、[使用者オフセット] を [タイムスタンプを指定] に設定した場合にのみ必要です。
2024-06-18 15:28:29
データ形式
データ形式は、データソースからのバイナリデータをエンコードする方法を指定します。複数のエンコード形式がサポートされています。特定のエンコーディング要件がない場合は、形式を Json に設定します。
Json (デフォルト): UTF-8 に基づいてバイナリデータを JSON 形式にエンコードし、データをペイロードに配置します。
テキスト: UTF-8 に基づいてバイナリデータを文字列にエンコードし、文字列をペイロードに配置します。
バイナリ: Base64 に基づいてバイナリデータを文字列にエンコードし、文字列をペイロードに配置します。
Json
一括プッシュの件数
[高度な設定] パラメーター。1 回のバッチ呼び出しで関数に送信できるメッセージの最大数。リクエストは、バックログされたメッセージの数が指定された値に達した場合にのみ送信されます。値は [1, 10000] の範囲内である必要があります。
100
バッチプッシュ間隔 (単位:秒)
高度な設定 パラメーター。関数が呼び出される間隔。システムはメッセージを集約し、指定された間隔で Function Compute に送信します。値は [0, 15] の範囲内である必要があります。単位: 秒。値 0 は、待機期間なしでメッセージがすぐに配信されることを示します。
3
Filtering (フィルタリング) ステップで、イベントをフィルターするために パターン内容 を設定します。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルートなどの複雑なデータ処理を実行するためにデータクリーニングを設定します。詳細については、「Function Compute を使用してメッセージデータをクリーンアップする」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ を [ApsaraMQ For Kafka] に設定し、次のパラメーターを設定します。
パラメーター
説明
例
インスタンス ID
作成した ApsaraMQ for Kafka インスタンス。
test
Topic
作成したインスタンス内の Topic。
test
確認モード (ACK)
ApsaraMQ for Kafka インスタンスがデータを受信した後にクライアントに送信する確認信号。
なし
リーダーのみ
すべて
なし
メッセージ本文 (値)
EventBridge は JSONPath を使用してメッセージからデータを抽出し、指定されたメッセージコンテンツをターゲットにルーティングします。
完全なデータ
データ抽出
静的フィールド
テンプレート
データ抽出
$.data.valueメッセージキー
EventBridge は JSONPath を使用してメッセージからデータを抽出し、指定されたメッセージコンテンツをターゲットにルーティングします。
空
データ抽出
静的フィールド
テンプレート
データ抽出
$.data.key
タスクプロパティ
失敗したイベントプッシュのリトライポリシーとエラー処理メソッドを設定します。詳細については、「リトライとデッドレターキュー」をご参照ください。
タスクリスト ページに戻ります。作成したコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント ダイアログボックスで、メッセージを読み、OK をクリックします。
コネクタを有効にすると、30〜60 秒の起動レイテンシが発生する場合があります。タスクリスト ページの Status 列で起動の進捗状況を確認できます。
その他の操作
タスクリスト ページで、ターゲットコネクタを見つけ、操作する 列から追加の操作を実行します。
コネクタの詳細の表示: 詳細 をクリックします。[タスク] ページでは、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示できます。
コネクタ設定の編集: 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更できます。
コネクタの有効化または無効化: 有効化する または 無効化 をクリックします。ヒント ダイアログボックスで、OK をクリックします。
コネクタの削除: 削除する をクリックします。ヒント ダイアログボックスで、OK をクリックします。