すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Create ApsaraMQ for Kafka sink connectors

最終更新日:Feb 18, 2025

このトピックでは、ApsaraMQ for Kafka コンソールでシンクコネクタを作成し、ある ApsaraMQ for Kafka インスタンスから別の ApsaraMQ for Kafka インスタンスにデータを同期する方法について説明します。

Prerequisites

Create a sink connector

  1. ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        Parameter

        説明

        [リージョン]

        ソースの ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (北京)

        [apsaramq For Kafka インスタンス]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。

        MQ_INST_115964845466****_ByBeUp3p

        [トピック]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスのトピック。

        topic

        [グループ ID]

        ソース ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。メッセージルーティングソースを作成するには、別のコンシューマーグループを使用する必要があります。使用中のコンシューマーグループは使用しないでください。そうしないと、既存のメッセージの送受信に失敗する可能性があります。

        GID_http_1

        [コンシューマーオフセット]

        メッセージが消費されるオフセット。

        最新のオフセット

        [ネットワーク構成]

        メッセージをルーティングするネットワークのタイプ。

        ベーシックネットワーク

        [VPC]

        ApsaraMQ for Kafka インスタンスがデプロイされている仮想プライベートクラウド (VPC) の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vpc-bp17fapfdj0dwzjkd****

        [vswitch]

        ApsaraMQ for Kafka インスタンスが関連付けられている vSwitch の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vsw-bp1gbjhj53hdjdkg****

        [セキュリティグループ]

        ApsaraMQ for Kafka インスタンスが属するセキュリティグループ。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        alikafka_pre-cn-7mz2****

        一括プッシュの件数

        関数呼び出しごとに送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義して、リクエストをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、データクレンジング方法を指定して、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装します。詳細については、「データクレンジング」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。

        Parameter

        説明

        インスタンス ID

        作成した宛先の ApsaraMQ for Kafka インスタンス。

        test

        Topic

        作成した宛先 ApsaraMQ for Kafka インスタンスのトピック。

        test

        確認モード (ACK)

        ApsaraMQ for Kafka が受信データを確認するために使用するモード。

        • [なし]

        • [leaderonly]

        • [すべて]

        なし

        メッセージ本文 (値)

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [完全なデータ]

        • [データ抽出]

        • [固定値]

        • [テンプレート]

        データ抽出

        $.data.value

        メッセージキー

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [空]

        • [データ抽出]

        • [固定値]

        • [テンプレート]

        データ抽出

        $.data.key
    • タスクのプロパティ

      タスクのリトライポリシーと配信不能キューを構成します。詳細については、「リトライポリシーと配信不能キュー」をご参照ください。

  5. タスクリスト ページに戻り、作成した ApsaraMQ for Kafka シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。

  6. ヒント メッセージで、OK をクリックします。

    コネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進行状況を確認できます。

Other operations

タスクリスト ページで、管理するメッセージアウトフロータスクを見つけ、操作する 列で他の操作を実行します。次の項目では、実行できる操作について説明します。

  • タスクの詳細を表示する: 操作列の 詳細 をクリックします。表示されるページで、タスクの基本情報、プロパティ、および監視メトリックを表示します。

  • タスクの構成を変更する: 操作列の 編集する をクリックします。[タスクの編集] パネルで、タスクの構成を変更します。

  • タスクを有効または無効にする: 操作列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。

  • タスクを削除する: 操作列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。