すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:RocketMQ ソースコネクタの作成

最終更新日:Nov 09, 2025

このトピックでは、ApsaraMQ for Kafka コンソールで、ApsaraMQ for RocketMQ から ApsaraMQ for Kafka にデータを同期するためのソースコネクタを作成する方法について説明します。

前提条件

ソースコネクタの作成

  1. ApsaraMQ for Kafka コンソールにログインします。概要 ページの リソースの分布 セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 パネルで、Task NameDescription を指定します。次に、次のパラメーターを設定し、[保存] をクリックします。

    • タスクの作成

      1. 設定ウィザードの Source (ソース) ステップで、データプロバイダー[ApsaraMQ For RocketMQ] に設定し、次のパラメーターを設定して、[次へ] をクリックします。

        パラメーター

        説明

        リージョン

        ソース ApsaraMQ for RocketMQ インスタンスが配置されているリージョン。

        中国 (杭州)

        バージョン

        ApsaraMQ for RocketMQ インスタンスのバージョン。

        • RocketMQ 4.x: サーバーバージョン 4.x。

        • RocketMQ 5.x: サーバーバージョン 5.x。

        RocketMQ 5.x

        RocketMQ インスタンス

        メッセージを生成するソース ApsaraMQ for RocketMQ インスタンス。

        rmq-cn-****

        Topic

        メッセージを生成するソース Topic。

        topic

        Tag

        ApsaraMQ for RocketMQ でメッセージをフィルターするために使用されるタグ。

        test_tag

        グループ ID

        ApsaraMQ for RocketMQ の使用者グループの名前。

        • クイック作成: 推奨。使用者グループの ID は、GID_EVENTBRIDGE_xxx 形式で自動的に作成されます。

        • 既存のものを使用: 独立したグループ ID を選択します。他のサービスですでに使用されているグループ ID は使用しないでください。これにより、既存のメッセージの送受信との干渉を防ぎます。

        クイック作成

        使用者オフセット

        • 最新のオフセット: 最新のオフセットからメッセージの使用を開始します。

        • 最も早いオフセット: 最も早いオフセットからメッセージの使用を開始します。

        • タイムスタンプを指定: 特定の時点からメッセージの使用を開始します。

        最新のオフセット

        消費日時

        消費チェックポイント。このパラメーターは、[使用者オフセット][タイムスタンプを指定] に設定した場合にのみ必要です。

        2024-06-18 15:28:29

        データ形式

        データ形式は、データソースからのバイナリデータをエンコードする方法を指定します。複数のエンコード形式がサポートされています。特定のエンコーディング要件がない場合は、形式を Json に設定します。

        • Json (デフォルト): UTF-8 に基づいてバイナリデータを JSON 形式にエンコードし、データをペイロードに配置します。

        • テキスト: UTF-8 に基づいてバイナリデータを文字列にエンコードし、文字列をペイロードに配置します。

        • バイナリ: Base64 に基づいてバイナリデータを文字列にエンコードし、文字列をペイロードに配置します。

        Json

        一括プッシュの件数

        [高度な設定] パラメーター。1 回のバッチ呼び出しで関数に送信できるメッセージの最大数。リクエストは、バックログされたメッセージの数が指定された値に達した場合にのみ送信されます。値は [1, 10000] の範囲内である必要があります。

        100

        バッチプッシュ間隔 (単位:秒)

        高度な設定 パラメーター。関数が呼び出される間隔。システムはメッセージを集約し、指定された間隔で Function Compute に送信します。値は [0, 15] の範囲内である必要があります。単位: 秒。値 0 は、待機期間なしでメッセージがすぐに配信されることを示します。

        3

      2. Filtering (フィルタリング) ステップで、イベントをフィルターするために パターン内容 を設定します。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルートなどの複雑なデータ処理を実行するためにデータクリーニングを設定します。詳細については、「Function Compute を使用してメッセージデータをクリーンアップする」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ[ApsaraMQ For Kafka] に設定し、次のパラメーターを設定します。

        パラメーター

        説明

        インスタンス ID

        作成した ApsaraMQ for Kafka インスタンス。

        test

        Topic

        作成したインスタンス内の Topic。

        test

        確認モード (ACK)

        ApsaraMQ for Kafka インスタンスがデータを受信した後にクライアントに送信する確認信号。

        • なし

        • リーダーのみ

        • すべて

        なし

        メッセージ本文 (値)

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、指定されたメッセージコンテンツをターゲットにルーティングします。

        • 完全なデータ

        • データ抽出

        • 静的フィールド

        • テンプレート

        データ抽出

        $.data.value

        メッセージキー

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、指定されたメッセージコンテンツをターゲットにルーティングします。

        • データ抽出

        • 静的フィールド

        • テンプレート

        データ抽出

        $.data.key
    • タスクプロパティ

      失敗したイベントプッシュのリトライポリシーとエラー処理メソッドを設定します。詳細については、「リトライとデッドレターキュー」をご参照ください。

  5. タスクリスト ページに戻ります。作成したコネクタを見つけ、操作する 列の 有効化する をクリックします。

  6. ヒント ダイアログボックスで、メッセージを読み、OK をクリックします。

    コネクタを有効にすると、30〜60 秒の起動レイテンシが発生する場合があります。タスクリスト ページの Status 列で起動の進捗状況を確認できます。

その他の操作

タスクリスト ページで、ターゲットコネクタを見つけ、操作する 列から追加の操作を実行します。

  • コネクタの詳細の表示: 詳細 をクリックします。[タスク] ページでは、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示できます。

  • コネクタ設定の編集: 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更できます。

  • コネクタの有効化または無効化: 有効化する または 無効化 をクリックします。ヒント ダイアログボックスで、OK をクリックします。

  • コネクタの削除: 削除する をクリックします。ヒント ダイアログボックスで、OK をクリックします。