すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:AnalyticDB シンクコネクタの作成

最終更新日:Mar 11, 2026

AnalyticDB シンクコネクタは、ApsaraMQ for Kafka の Topic からメッセージを読み取り、それらを AnalyticDB for MySQL または AnalyticDB for PostgreSQL データベースに書き込みます。このコネクタは、Function Compute を使用して、同じリージョン内のサービス間でデータを転送します。

仕組み

データは 3 つのコンポーネントを介して流れます。

  1. ApsaraMQ for Kafka は、データソース Topic からメッセージを読み取ります。

  2. Function Compute は、メッセージを受信し、それらをターゲットデータベースに書き込みます。

  3. AnalyticDB は、指定されたテーブルにデータを保存します。

内部的には、コネクタは 5 つの Topic (オフセット、構成、ステータス、デッドレターキュー、およびエラーデータ用) と 1 つのコンシューマーグループを使用します。これらのリソースは、自動または手動で作成できます。

前提条件

開始する前に、以下を確認してください。

コネクタの構成のために、以下の情報を収集してください。

情報説明
データソース Topic 名データのエクスポート元となる Kafka Topicadb-test-input
AnalyticDB インスタンス ID宛先インスタンス IDam-bp139yqk8u1ik****
データベース名ターゲットデータベースadb_demo
テーブル名エクスポートされたデータの宛先テーブルuser
データベースユーザー名データベースログインユーザー名adbmysql
データベースパスワードデータベースログインパスワード********

注意事項

  • データエクスポートは同じリージョンに限定されます。リージョン間エクスポートはサポートされていません。詳細については、「使用制限」をご参照ください。

  • Function Compute は無料リソースクォータを提供します。無料クォータを超える使用量については、「Function Compute の料金」に従って課金されます。

  • ApsaraMQ for Kafka は、メッセージを UTF-8 エンコードされた文字列としてシリアル化します。バイナリデータはサポートされていません。

  • ターゲットデータベースがプライベートエンドポイントを使用している場合、Function Compute サービスに同じ VPC と vSwitch を構成してください。そうしないと、Function Compute はデータベースに到達できません。詳細については、「サービスの更新」をご参照ください。

  • ApsaraMQ for Kafka は、コネクタを作成する際に、既に存在しない場合、自動的にサービスリンクロールを作成します。

  • 関数実行の問題をトラブルシューティングするには、Function Compute のログ記録を使用します。詳細については、「ログ記録の設定」をご参照ください。

コネクタのセットアップ

コネクタをセットアップするには:

  1. (オプション) 必要な Topic とコンシューマーグループの作成

  2. コネクタの作成とデプロイ

  3. Function Compute ネットワークの構成

  4. AnalyticDB のホワイトリストへの VPC CIDR ブロックの追加

  5. データエクスポートの検証

ステップ 1: (オプション) 必要な Topic とコンシューマーグループの作成

ApsaraMQ for Kafka がこれらのリソースを自動的に作成するには、この手順をスキップし、[リソース作成方法][自動] に設定します(ステップ 2 で)。
重要

ご利用の ApsaraMQ for Kafka インスタンスがメジャーバージョン 0.10.2 を実行している場合、ローカルストレージエンジンを必要とする Topic は自動的に作成する必要があります。このバージョンでは、ローカルストレージ Topic の手動作成はサポートされていません。

コネクタには 5 つの内部 Topic と 1 つのコンシューマーグループが必要です。特定の構成が必要な場合にのみ、手動で作成してください。

必要な Topic

Topic名前プレフィックスパーティションストレージエンジンログクリーンアップポリシー
タスクオフセットconnect-offset1 より大きいローカルストレージコンパクト
タスク構成connect-config1ローカルストレージコンパクト
タスクステータスconnect-status6 (推奨)ローカルストレージコンパクト
デッドレターキューconnect-error6 (推奨)ローカルストレージ または クラウドストレージ任意
エラーデータconnect-error6 (推奨)ローカルストレージ または クラウドストレージ任意
ヒント: デッドレターキュー Topic とエラーデータ Topic は、リソースを節約するために同じ Topic を共有できます。

Topic の作成

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース分布] セクションで、インスタンスのリージョンを選択します。

重要

Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンをまたいで使用できません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされた ECS インスタンスで実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。

  1. [インスタンス] ページで、インスタンス名をクリックします。

  2. 左側のナビゲーションウィンドウで、[トピック] をクリックします。

  3. [トピック] ページで、[トピックの作成] をクリックします。

  4. [トピックの作成] パネルで、以下のパラメーターを設定し、[OK] をクリックします。

パラメーター説明
Nameトピック名。上記の表にある命名プレフィックスを使用します(例:connect-offset-kafka-adb-sink)。
Partitionsパーティション数。上記の表で必要な値を確認してください。
Storage Engineストレージエンジンのタイプ。Professional Edition インスタンスでのみ利用可能です。Standard Edition インスタンスでは、デフォルトで Cloud Storage が使用されます。オプションは以下のとおりです。Cloud Storage(Alibaba Cloud ディスクによる低遅延・高信頼性の 3 レプリカ分散ストレージ)または Local Storage(Apache Kafka ISR アルゴリズムによる 3 レプリカ分散ストレージ)。Standard (High Write) インスタンスでは、Cloud Storage のみがサポートされています。
Message Typeメッセージの順序保証レベル。Normal MessageCloud Storage のデフォルト):ブローカー障害時にパーティション内の順序が保たれない場合があります。Partitionally Ordered MessageLocal Storage のデフォルト):ブローカー障害時にも順序が保たれますが、影響を受けたパーティションは復旧されるまで利用できません。
Log Cleanup PolicyStorage EngineLocal Storage の場合に必須です。Delete(デフォルト):保存期間に基づいてメッセージを保持し、ストレージ使用率が 85 % を超えると最も古いメッセージを削除します。Compact:キーごとに最新の値のみを保持します。Kafka Connect を使用するコネクタ内部トピックにはこの設定が必要です。
重要

ログ圧縮済みトピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

Description任意の説明です。
Tag任意のタグです。
  1. 必要な 5 つの Topic をすべて作成するために繰り返します。

コンシューマーグループの作成

コネクタには、connect-<connector-name> (例: connect-kafka-adb-sink) という名前のコンシューマーグループが必要です。

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [リソース配布]」セクションで、インスタンスのリージョンを選択します。

  3. [インスタンス] ページで、インスタンス名をクリックします。

  4. 左側のナビゲーションウィンドウで、[グループ] をクリックします。

  5. [グループ] ページで、[グループの作成] をクリックします。

  6. [グループの作成]」パネルで、[グループ ID] フィールドにグループ名を入力し、任意の説明とタグを追加してから、[OK] をクリックします。

ステップ 2: コネクタの作成とデプロイ

  1. ApsaraMQ for Kafka コンソールにログインします。

  2. [概要] ページの [リソース分布] セクションで、インスタンスのリージョンを選択します。

  3. [インスタンス] ページで、インスタンス名をクリックします。

  4. 左側のナビゲーションウィンドウで、[コネクタ] をクリックします。

  5. コネクタ」ページで、「コネクタの作成」をクリックします。

  6. [コネクタの作成] ウィザードを完了します。

基本情報の構成

パラメーター説明
名前コネクタ名。1~48 文字: 数字、小文字、ハイフン (-)。ハイフンで開始することはできません。インスタンス内で一意である必要があります。kafka-adb-sink
インスタンスApsaraMQ for Kafka インスタンス。インスタンス名と ID を表示します。demo alikafka_post-cn-st21p8vj****

[次へ] をクリックします。

ソースサービスの構成

[Message Queue For Apache Kafka] をソースサービスとして選択し、以下のパラメーターを設定します。

パラメーター説明
[データソースのトピック]データをエクスポートする元のトピックです。adb-test-input
[コンシューマースレッドの同時実行数]同時実行コンシューマースレッドの数です。デフォルト: 6。有効な値: 1、2、3、6、12。6
[コンシューマーオフセット]消費を開始する場所です。 [最も古いオフセット]:最初から開始します。 [最新のオフセット]:最新のメッセージから開始します。[最も古いオフセット]

[ランタイム環境の設定]」をクリックして、詳細設定を展開します。

パラメーター説明
VPC IDエクスポートタスク用の VPC。ApsaraMQ for Kafka インスタンスの VPC にデフォルト設定されます。vpc-bp1xpdnd3l***
vSwitch IDエクスポートタスク用の vSwitch。インスタンスと同じ VPC 内にある必要があります。vsw-bp1d2jgg81***
障害処理ポリシーメッセージ送信失敗時のアクション。[継続サブスクリプション]: メッセージの消費を継続し、エラーをログに記録します。[停止サブスクリプション]: メッセージの消費を停止し、エラーをログに記録します。ログの詳細については、「コネクタの管理」をご参照ください。エラーコードについては、「エラーコード」をご参照ください。サブスクリプションを続行
リソース作成方法必要な内部トピックとコンシューマーグループの作成方法。 自動: ApsaraMQ for Kafka が自動的に作成します。 手動: ステップ 1 で作成されたリソースを使用します。自動
コネクタコンシューマーグループコネクタ用のコンシューマーグループ。フォーマット: connect-<connector-name>connect-kafka-adb-sink
タスクオフセットトピックコンシューマオフセットを保存します。名前プレフィックス: connect-offset。パーティション: 1 より大きい。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクトconnect-offset-kafka-adb-sink
タスク構成 Topicタスク構成を保存します。名前プレフィックス: connect-config。パーティション: 1。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクトconnect-config-kafka-adb-sink
タスクステータス Topicタスクステータスを保存します。名前プレフィックス: connect-status。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ。クリーンアップポリシー: コンパクトconnect-status-kafka-adb-sink
デッドレターキュー トピックKafka Connect フレームワークのエラーデータを保存します。名前プレフィックス: connect-error。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ または クラウドストレージ。エラーデータ Topic と Topic を共有できます。connect-error-kafka-adb-sink
エラーデータ Topicシンクコネクタのエラーデータを保存します。名前プレフィックス: connect-error。パーティション: 6 (推奨)。ストレージエンジン: ローカルストレージ または クラウドストレージ。デッドレターキュー Topic と Topic を共有できます。connect-error-kafka-adb-sink

[Next] をクリックします。

宛先サービスの構成

宛先サービスとして AnalyticDB を選択し、以下のパラメーターを構成します。

パラメーター説明
インスタンスタイプターゲットデータベースタイプ: AnalyticDB for MySQL または AnalyticDB for PostgreSQLAnalyticDB for MySQL
AnalyticDB インスタンス ID宛先インスタンス ID。am-bp139yqk8u1ik****
データベース名ターゲットデータベース。adb_demo
テーブル名エクスポートされたデータの宛先テーブル。user
データベースユーザー名データベースログインユーザー名。adbmysql
データベースパスワードデータベースのログインパスワード。パスワードを忘れた場合、AnalyticDB for MySQL では AnalyticDB for MySQL コンソールでリセットします。AnalyticDB for PostgreSQL では、[アカウント管理] に移動し、[パスワードのリセット] をクリックします。********
データベースのユーザー名とパスワードは、ApsaraMQ for Kafka がエクスポートタスクを作成する際に環境変数として Function Compute に渡されます。ApsaraMQ for Kafka は、タスク作成後にこれらの認証情報を保存しません。

[作成] をクリックします。

  1. [コネクタ] ページで、コネクタを見つけ、[アクション] 列の [デプロイ] をクリックします。

ステップ 3: Function Compute ネットワークの構成

デプロイ後、Function Compute はコネクタ用にサービス (名前: kafka-service-<connector_name>-<random_string>) と関数 (名前: fc-adb-<random_string>) を自動的に作成します。

  1. [コネクタ] ページでコネクタを見つけ、[操作] 列の [関数の設定] をクリックします。これにより Function Compute コンソールが開きます。

  2. Function Compute コンソールで、自動的に作成されたサービスを見つけ、ターゲットデータベースと一致するように VPC と vSwitch を構成します。詳細については、「サービスの更新」をご参照ください。

ステップ 4: AnalyticDB のホワイトリストへの VPC CIDR ブロックの追加

Function Compute で設定された VPC の CIDR ブロックを AnalyticDB のホワイトリストに追加します。CIDR ブロックは、[VPC コンソール][vSwitch] ページで確認できます — Function Compute サービスの VPC および vSwitch と一致する行に表示されます。

ステップ 5: データエクスポートの検証

テストメッセージの送信

重要

メッセージコンテンツは有効な JSON である必要があります。各 JSON キーは宛先テーブルのカラム名にマップされ、各値はカラムデータにマップされます。テストメッセージを送信する前に、JSON キーが宛先テーブルのカラム名と一致することを確認してください。

  1. コネクタ」ページで、コネクタを見つけ、「操作」列の「テスト」をクリックします。

  2. [メッセージの送信] パネルで、[送信方法] を選択します:

    • コンソール:

      1. [メッセージキー] フィールドに、キー (例: demo) を入力します。

      2. [メッセージ内容] フィールドに、JSON コンテンツ (例: {"key": "test"}) を入力します。

      3. [指定パーティションへの送信] で、[はい] を選択して [パーティション ID] (例: 0) を入力して特定のパーティションをターゲットにするか、[いいえ] を選択して ApsaraMQ for Kafka にパーティションを割り当てさせます。 パーティション ID の詳細については、「パーティションステータスの表示」をご参照ください。

    • Docker[Docker コンテナを実行してサンプルメッセージを生成する] セクションに表示されている Docker コマンドを実行します。

    • [SDK]: プログラミング言語用の SDK とテストメッセージを送信するためのアクセス方法を選択します。

結果の検証

メッセージ送信後、宛先テーブルをチェックしてデータがエクスポートされたことを確認します。

  1. AnalyticDB for MySQL コンソールまたは AnalyticDB for PostgreSQL コンソールにログインします。

  2. ターゲットデータベースに接続します。

  3. Data Management Service 5.0 コンソール」の [SQL コンソール] で、送信先テーブルを開き、エクスポートされたデータが存在することを確認してください。

次のステップ

  • コネクタの管理:コネクタのステータスを確認、一時停止、再開、または削除します。

  • ログの設定:データエクスポート時の問題をトラブルシューティングするために、Function Compute のログを設定します。