すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:AnalyticDB シンクコネクタを作成する

最終更新日:Feb 18, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic から AnalyticDB のテーブルにデータを同期するために、AnalyticDB シンクコネクタを作成する方法について説明します。

前提条件

前提条件については、「前提条件」をご参照ください。

ステップ 1:AnalyticDB リソースを作成する

AnalyticDB for MySQL または AnalyticDB for PostgreSQL リソースを作成します。

このトピックでは、adb_sink_database という名前の AnalyticDB for MySQL データベースと adb_sink_table という名前のデータテーブルが作成されます。

ステップ 2:AnalyticDB シンクコネクタを作成して起動する

  1. ApsaraMQ for Kafka コンソールにログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、Connector エコシステムの統合 > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        [リージョン]

        ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (北京)

        [apsaramq For Kafka インスタンス]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。

        MQ_INST_115964845466****_ByBeUp3p

        [トピック]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスの Topic。

        topic

        [グループ ID]

        ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。メッセージルーティングソースを作成するには、別のコンシューマーグループを使用する必要があります。使用中のコンシューマーグループは使用しないでください。そうしないと、既存のメッセージの送受信に失敗する可能性があります。

        GID_http_1

        [コンシューマーオフセット]

        メッセージが消費されるオフセット。

        最新のオフセット

        [ネットワーク設定]

        メッセージをルーティングするネットワークのタイプ。

        ベーシックネットワーク

        [VPC]

        ApsaraMQ for Kafka インスタンスがデプロイされている仮想プライベートクラウド (VPC) の ID。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vpc-bp17fapfdj0dwzjkd****

        [vswitch]

        ApsaraMQ for Kafka インスタンスが関連付けられている vSwitch の ID。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vsw-bp1gbjhj53hdjdkg****

        [セキュリティグループ]

        ApsaraMQ for Kafka インスタンスが属するセキュリティグループ。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        alikafka_pre-cn-7mz2****

        一括プッシュの件数

        関数呼び出しごとに送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値:1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値:0 ~ 15。単位:秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディタでデータパターンを定義して、リクエストをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するためのデータクレンジング方法を指定します。詳細については、「データクレンジング」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [analyticdb] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        インスタンスタイプ

        作成したインスタンスタイプ。この例では、AnalyticDB for MySQL が選択されています。有効な値:

        • [analyticdb For Mysql]

        • [analyticdb For Postgresql]

        AnalyticDB for MySQL

        AnalyticDB インスタンス ID

        作成した AnalyticDB for MySQL インスタンスの ID。

        gp-bp10uo5n536wd****

        データベース名

        作成したデータベースの名前。

        adb_sink_database

        テーブル名

        作成したテーブルの名前。

        adb_sink_table

        データマッピング

        ApsaraMQ for Kafka から AnalyticDB に転送されるデータの形式。JSONPath ルールを使用して、データベーステーブルの値抽出ルールを指定できます。データ形式 パラメーターを Source (ソース) ステップで [json] に設定した場合、ApsaraMQ for Kafka から転送されるデータの形式は、次のコードのようになります。

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"adb-sink-k1",
                "value": {
                    "userid":"xiaoming",
                    "source":"shanghai"
                }
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        テーブルの列名に基づいて JSONPath ルールを指定します。たとえば、テーブルの列名が userid の場合、値抽出ルールとして $.data.value.userid を指定します。

        データベースユーザー名

        データベースアカウントへのアクセスに使用するユーザー名。

        user

        データベースパスワード

        データベースアカウントへのアクセスに使用するパスワード。

        ******

        ネットワーク設定

        • [VPC]: ApsaraMQ for Kafka のメッセージは、仮想プライベートクラウド (VPC) 内の AnalyticDB に配信されます。

        • [インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で AnalyticDB に配信されます。

        VPC

        VPC

        VPC の ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        vSwitch の ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。

        重要

        vSwitch を選択した後、vSwitch が属する CIDR ブロックを AnalyticDB for MySQL インスタンスの IP アドレスホワイトリストに追加する必要があります。詳細については、「IP アドレスホワイトリスト」をご参照ください。

        vsw-bp1gbjhj53hdjdkg****

        セキュリティグループ

        セキュリティグループの ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。

        test_group

  4. タスクリスト ページに戻り、作成した OSS シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。

  5. ヒント メッセージで、OK をクリックします。

    コネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進行状況を確認できます。

ステップ 3:AnalyticDB シンクコネクタをテストする

  1. [タスク] ページで、作成した AnalyticDB シンクコネクタを見つけ、[イベントソース] 列のソース Topic 名をクリックします。

  2. トピック詳細ページで、[メッセージの送信] をクリックします。
  3. [メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを設定し、[OK] をクリックします。

    説明

    この例では、メッセージコンテンツは、作成されたデータテーブルのすべての列を含む JSON 文字列です。システムは、データテーブルの列と同じ名前のフィールドの値を対応する列に書き込みます。

    image

  4. [タスク] ページで、作成した AnalyticDB シンクコネクタを見つけ、[イベントターゲット] 列の宛先インスタンス名をクリックします。

  5. [基本情報] ページの右上隅にある [データベースにログオン] をクリックします。

  6. データ管理 (DMS) コンソールで、次のステートメントを実行して、テーブル内のすべてのデータをクエリします。

    SELECT * FROM  adb_sink_table;

    次の図は、クエリ結果を示しています。image