このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic から AnalyticDB のテーブルにデータを同期するために、AnalyticDB シンクコネクタを作成する方法について説明します。
前提条件
前提条件については、「前提条件」をご参照ください。
ステップ 1:AnalyticDB リソースを作成する
AnalyticDB for MySQL または AnalyticDB for PostgreSQL リソースを作成します。
AnalyticDB for MySQL テーブルにデータをエクスポートする場合、クラスタ、データベースアカウント、データベースを作成し、AnalyticDB for MySQL コンソールでクラスタに接続する必要があります。詳細については、「クラスタを作成する」、「データベースアカウントを作成する」、「AnalyticDB for MySQL クラスタに接続する」、および「データベースを作成する」をご参照ください。
AnalyticDB for PostgreSQL テーブルにデータをエクスポートする場合、インスタンスとデータベースアカウントを作成し、AnalyticDB for PostgreSQL コンソールでデータベースに接続する必要があります。詳細については、「インスタンスを作成する」、「データベースアカウントを作成および管理する」、および「クライアント接続」をご参照ください。
このトピックでは、adb_sink_database という名前の AnalyticDB for MySQL データベースと adb_sink_table という名前のデータテーブルが作成されます。
ステップ 2:AnalyticDB シンクコネクタを作成して起動する
ApsaraMQ for Kafka コンソールにログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
[リージョン]
ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (北京)
[apsaramq For Kafka インスタンス]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。
MQ_INST_115964845466****_ByBeUp3p
[トピック]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスの Topic。
topic
[グループ ID]
ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。メッセージルーティングソースを作成するには、別のコンシューマーグループを使用する必要があります。使用中のコンシューマーグループは使用しないでください。そうしないと、既存のメッセージの送受信に失敗する可能性があります。
GID_http_1
[コンシューマーオフセット]
メッセージが消費されるオフセット。
最新のオフセット
[ネットワーク設定]
メッセージをルーティングするネットワークのタイプ。
ベーシックネットワーク
[VPC]
ApsaraMQ for Kafka インスタンスがデプロイされている仮想プライベートクラウド (VPC) の ID。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
vpc-bp17fapfdj0dwzjkd****
[vswitch]
ApsaraMQ for Kafka インスタンスが関連付けられている vSwitch の ID。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
vsw-bp1gbjhj53hdjdkg****
[セキュリティグループ]
ApsaraMQ for Kafka インスタンスが属するセキュリティグループ。[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
alikafka_pre-cn-7mz2****
一括プッシュの件数
関数呼び出しごとに送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値:1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値:0 ~ 15。単位:秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディタでデータパターンを定義して、リクエストをフィルタリングします。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するためのデータクレンジング方法を指定します。詳細については、「データクレンジング」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [analyticdb] に設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
インスタンスタイプ
作成したインスタンスタイプ。この例では、AnalyticDB for MySQL が選択されています。有効な値:
[analyticdb For Mysql]
[analyticdb For Postgresql]
AnalyticDB for MySQL
AnalyticDB インスタンス ID
作成した AnalyticDB for MySQL インスタンスの ID。
gp-bp10uo5n536wd****
データベース名
作成したデータベースの名前。
adb_sink_database
テーブル名
作成したテーブルの名前。
adb_sink_table
データマッピング
ApsaraMQ for Kafka から AnalyticDB に転送されるデータの形式。JSONPath ルールを使用して、データベーステーブルの値抽出ルールを指定できます。データ形式 パラメーターを Source (ソース) ステップで [json] に設定した場合、ApsaraMQ for Kafka から転送されるデータの形式は、次のコードのようになります。
{ "data": { "topic": "demo-topic", "partition": 0, "offset": 2, "timestamp": 1739756629123, "headers": { "headers": [], "isReadOnly": false }, "key":"adb-sink-k1", "value": { "userid":"xiaoming", "source":"shanghai" } }, "id": "7702ca16-f944-4b08-***-***-0-2", "source": "acs:alikafka", "specversion": "1.0", "type": "alikafka:Topic:Message", "datacontenttype": "application/json; charset=utf-8", "time": "2025-02-17T01:43:49.123Z", "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic", "aliyunaccountid": "1******6789" }テーブルの列名に基づいて JSONPath ルールを指定します。たとえば、テーブルの列名が userid の場合、値抽出ルールとして
$.data.value.useridを指定します。データベースユーザー名
データベースアカウントへのアクセスに使用するユーザー名。
user
データベースパスワード
データベースアカウントへのアクセスに使用するパスワード。
******
ネットワーク設定
[VPC]: ApsaraMQ for Kafka のメッセージは、仮想プライベートクラウド (VPC) 内の AnalyticDB に配信されます。
[インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で AnalyticDB に配信されます。
VPC
VPC
VPC の ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。
vpc-bp17fapfdj0dwzjkd****
vSwitch
vSwitch の ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。
重要vSwitch を選択した後、vSwitch が属する CIDR ブロックを AnalyticDB for MySQL インスタンスの IP アドレスホワイトリストに追加する必要があります。詳細については、「IP アドレスホワイトリスト」をご参照ください。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ
セキュリティグループの ID。[ネットワーク設定] パラメーターを VPC に設定した場合にのみ、このパラメーターが必要です。
test_group
タスクリスト ページに戻り、作成した OSS シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
コネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進行状況を確認できます。
ステップ 3:AnalyticDB シンクコネクタをテストする
[タスク] ページで、作成した AnalyticDB シンクコネクタを見つけ、[イベントソース] 列のソース Topic 名をクリックします。
- トピック詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを設定し、[OK] をクリックします。
説明この例では、メッセージコンテンツは、作成されたデータテーブルのすべての列を含む JSON 文字列です。システムは、データテーブルの列と同じ名前のフィールドの値を対応する列に書き込みます。

[タスク] ページで、作成した AnalyticDB シンクコネクタを見つけ、[イベントターゲット] 列の宛先インスタンス名をクリックします。
[基本情報] ページの右上隅にある [データベースにログオン] をクリックします。
データ管理 (DMS) コンソールで、次のステートメントを実行して、テーブル内のすべてのデータをクエリします。
SELECT * FROM adb_sink_table;次の図は、クエリ結果を示しています。
