すべてのプロダクト
Search
ドキュメントセンター

EventBridge:Debezium MySQL Source コネクタを使用した MySQL から ApsaraMQ for Kafka へのデータ同期 (非推奨)

最終更新日:Mar 12, 2026
重要

この機能は非推奨です。

アプリケーションが MySQL のデータ変更をリアルタイムで可視化する必要がある場合、手動でデータベースをポーリングすると、レイテンシーとオーバーヘッドが発生します。Debezium MySQL Source コネクタは、ApsaraDB RDS for MySQL からのすべての INSERTUPDATEDELETE 操作を、構造化された変更データキャプチャ (CDC) イベントとして ApsaraMQ for Kafka にストリーミングします。これにより、ダウンストリームのコンシューマーはほぼリアルタイムで変更を受け取ることができます。

仕組み

Debezium MySQL コネクタは MySQL のバイナリログ (binlog) を読み取り、各行レベルの変更を構造化されたイベントに変換します。Serverless App Engine (SAE) 上でマネージドサービスとして実行される Kafka Connect は、これらのイベントを ApsaraMQ for Kafka の送信先トピックに発行します。

データは 3 つのステージを通過します:

  1. MySQL は、コミットされたすべてのトランザクションを binlog に記録します。

  2. Debezium コネクタは binlog を読み取り、変更イベントを生成します。

  3. Kafka Connect は、各イベントを ApsaraMQ for Kafka の送信先トピックに発行します。

初回起動時、コネクタは既存データの初期スナップショットを取得し、その後、継続的な変更のために binlog の読み取りに切り替わります。

トピックの命名

コネクタは、2 種類の送信先トピックにイベントを発行します。コネクタを起動する前に、両方のトピックを作成してください。

トピックタイプ命名フォーマット
データベースレベル (スキーマ変更){database.server.name}fulfillment
テーブルレベル (行の変更){database.server.name}.{databaseName}.{tableName}fulfillment.mydb.orders

database.history.kafka.topic で指定される 3 番目のトピックは、内部スキーマ履歴を保存します。コネクタを起動する前に、このトピックを作成してください。

制限事項

  • Debezium MySQL コネクタは、1 つのタスク (tasks.max = 1) のみをサポートします。複数のワーカーでフォールトトレランスを提供することはできますが、タスク自体を並列化することはできません。

  • コネクタのバージョンは Java 8 と互換性がある必要があります。

前提条件

開始する前に、以下のサービスとリソースが準備できていることを確認してください:

ステップ 1:MySQL ソーステーブルの作成

ApsaraDB RDS for MySQL インスタンスをセットアップし、コネクタが監視するテーブルを作成します。

  1. ApsaraDB RDS コンソールにログインし、ApsaraDB RDS for MySQL インスタンスを作成します

    重要

    ApsaraMQ for Kafka インスタンスと同じ VPC を選択し、VPC CIDR ブロックをインスタンスのホワイトリストに追加してください。

    VPC selection when creating an RDS instance

  2. [インスタンス] ページで、作成したインスタンスをクリックします。インスタンスの詳細ページで、以下の操作を完了します。

    1. 左側のナビゲーションウィンドウで、[アカウント] をクリックし、「アカウントを作成」をご参照ください。また、既存のアカウントを使用することもできます。

    2. 左側のナビゲーションウィンドウで、[データベース] をクリックし、データベースを作成することができます。既存のデータベースを使用することもできます。

    3. 左側のナビゲーションウィンドウで、[データベース接続] をクリックし、内部エンドポイントポート番号 をメモします。コネクタを設定する際に、これらの値が必要になります。

    Internal endpoint and port on the Database Connection page

  3. 左側のナビゲーションウィンドウで、[基本情報] をクリックし、[データベースにログオン] をクリックして Data Management (DMS) コンソールを開きます。

    1. DMS の左側のナビゲーションウィンドウで、データベース名をダブルクリックして切り替えます。

      Select a database in the DMS console

    2. [SQLコンソール] タブで、以下の SQL ステートメントを実行してサンプルテーブルを作成します:

      CREATE TABLE sql_table(id INT, number INT);

ステップ 2:Kafka Connect コネクタのセットアップ

Debezium プラグインをダウンロードし、OSS にアップロードして、ApsaraMQ for Kafka コンソールでコネクタタスクを設定します。

コネクタプラグインのアップロード

  1. Debezium MySQL CDC Source Connector の ZIP ファイルをダウンロードします。

    重要

    Java 8 と互換性のあるバージョンを選択してください。

  2. 前提条件で作成した OSS バケットに ZIP ファイルをアップロードします

コネクタタスクの作成

  1. ApsaraMQ for Kafka コンソールにログオンします。 [概要] ページの [リソース分布] セクションで、Kafka インスタンスのリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [統合概要] に移動します。

  3. メッセージ流入」セクションで、「カスタム Kafka Connect」カードを見つけ、「作成」をクリックします。

  4. タスク」ページで、[タスクの作成] をクリックし、各ステップを完了します:

ソース

[データプロバイダー]Apache Kafka Connect に設定し、[次へ] をクリックします。

コネクタ

以下のパラメーターを設定し、[次のステップ] をクリックします。

Kafka Connect プラグイン

パラメーター説明
バケットDebezium コネクタの ZIP ファイルが含まれている OSS バケットを選択します。
ファイルアップロードした Debezium MySQL CDC Source Connector ファイルを選択します。

ApsaraMQ For Kafka リソース

パラメーター説明
ApsaraMQ for Kafka パラメーター[Source Connect] を選択します。
ApsaraMQ for Kafka インスタンス前提条件で準備した Kafka インスタンスを選択します。
VPCKafka インスタンスから自動入力されます。編集はできません。
vSwitchKafka インスタンスから自動入力されます。編集はできません。
セキュリティグループセキュリティグループを選択します。

Kafka Connect プロパティ

[.properties ファイルを ZIP パッケージから解析] の下にある ZIP パッケージから .properties ファイルを選択します(パス: /etc/xxx.properties)。コードエディタで以下のフィールドを更新します:

プロパティ説明
tasks.maxタスクの最大数。1 に設定します。
database.hostnameApsaraDB RDS for MySQL インスタンスの内部エンドポイント (ステップ 1 から)。
database.portApsaraDB RDS for MySQL インスタンスのポート番号 (ステップ 1 から)。
database.userApsaraDB RDS for MySQL データベースにアクセスするためのアカウント。
database.passwordApsaraDB RDS for MySQL データベースにアクセスするためのパスワード。
database.server.nameMySQL サーバーの論理名。使用可能な文字は、英字、数字、アンダースコア (_) です。この値は送信先トピック名を決定します (トピックの命名をご参照ください)。
database.include.listソースデータベース名。カンマで区切ります。
table.include.listソーステーブル名。{databaseName}.{tableName} 形式で、カンマで区切ります。
database.history.kafka.bootstrap.serversスキーマ履歴を保存する ApsaraMQ for Kafka インスタンスのエンドポイント。コネクタ設定時に選択した Kafka インスタンスを使用するか、別のインスタンスを作成できます。このインスタンスは RDS インスタンスと同じ VPC 内にある必要があり、動的グループ作成が有効になっている必要があります。
database.history.kafka.topicスキーマ変更履歴を保存するトピック。コネクタを起動する前にこのトピックを作成してください。
include.schema.changestrue に設定すると、スキーマ変更が {database.server.name} トピックに書き込まれます。
重要

コネクタを起動する前に、すべての送信先トピック ({database.server.name} および {database.server.name}.{databaseName}.{tableName}) を作成してください。これらのトピックが存在しない場合、コネクタは TimeoutException で失敗します。

Debezium MySQL コネクタのプロパティの完全なリストについては、Debezium ドキュメントの「Connector properties」をご参照ください。

インスタンス

ワーカーのスケーリングと内部依存関係を設定し、次に、[次のステップ] をクリックします。

ワーカーの種類

パラメーター説明
ワーカーの種類ワーカータイプを選択します。
最小ワーカー数ワーカーの最小数。1 以上である必要があります。
最大ワーカー数ワーカーの最大数。50 を超えることはできません。この制限を増やすには、チケットを送信してください。

ワーカーの構成

Apache Kafka コネクターワーカーの依存関係を自動的に作成 を選択して、システムが必要な内部トピックとコンシューマーグループを作成できるようにします。システムは、以下のリソースを生成します:

リソース命名フォーマット
オフセットトピックconnect-eb-offset-<Task name>
設定トピックconnect-eb-config-<Task name>
ステータストピックconnect-eb-status-<Task name>
Kafka Connect コンシューマーグループconnect-eb-cluster-<Task name>
ソースコネクタコンシューマーグループconnector-eb-cluster-<task name>-<connector name> (シンクコネクタにのみ有効)

実行設定

  1. [ログ配信][Log Service へのデータ配信] または [ApsaraMQ For Kafka へのデータ配信] に設定します。

  2. ロール承認]セクションで、[ロール]ドロップダウンリストからロールを選択します。

    重要

    AliyunSAEFullAccess ポリシーがアタッチされたロールを選択してください。そうしないと、タスクが失敗する可能性があります。

  3. [保存] をクリックします。

タスクのステータスが [実行中] に変更されると、コネクタは動作しています。

コネクタの検証

テストレコードを挿入し、変更イベントが Kafka に表示されるか確認します。

  1. DMS コンソールで、ステップ 1 で作成したテーブルにレコードを挿入します:

    INSERT INTO sql_table(id, number) VALUES(123, 20000);
  2. ApsaraMQ for Kafka コンソール」にログインします。[インスタンス] ページで、使用中のインスタンスをクリックします。

  3. 左側のナビゲーションウィンドウで、[トピック] をクリックし、目的のトピックを選択して、[メッセージクエリ] タブを開きます。

  4. 各トピックのレコードと期待されるメッセージフォーマットを確認します:

    データベースレベルのトピック ({database.server.name}) -- スキーマ変更イベントが含まれます:

    {
        "source": {
            "version": "1.5.0.Final",
            "connector": "mysql",
            "name": "fulfillment",
            "ts_ms": 1686283675404,
            "snapshot": "true",
            "db": "eb-ceshi",
            "table": "sql_table",
            "server_id": 0,
            "file": "mysql-bin.000006",
            "pos": 188032,
            "row": 0
        },
        "databaseName": "eb-cesh",
        "ddl": "DROP TABLE IF EXISTS sql_table",
        "tableChanges": []
    }

    テーブルレベルのトピック ({database.server.name}.{databaseName}.{tableName}) -- 変更前後のデータを含む行変更イベントが含まれます:

    {
        "before": null,
        "after": {
            "id": 123,
            "number": 20000
        },
        "source": {
            "version": "1.5.0.Final",
            "connector": "mysql",
            "name": "fulfillment",
            "ts_ms": 1686283675675,
            "snapshot": "last",
            "db": "eb-cesh",
            "table": "sql_table",
            "server_id": 0,
            "file": "mysql-bin.000006",
            "pos": 188032,
            "row": 0
        },
        "op": "r",
        "ts_ms": 1686283675675,
        "transaction": null
    }

    スキーマ履歴トピック (database.history.kafka.topic) -- DDL 変更レコードが含まれます:

    {
        "source": {
            "server": "fulfillment"
        },
        "position": {
            "ts_sec": 1686283675,
            "file": "mysql-bin.000006",
            "pos": 188032,
            "snapshot": true
        },
        "databaseName": "eb-cesh",
        "ddl": "CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci",
        "tableChanges": []
    }

トラブルシューティング

「すべてのタスクが失敗しました」

エラーメッセージ:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決策: タスク詳細ページの [基本情報] セクションで [診断] をクリックして Connector Monitoring ページを開き、タスクの失敗詳細を確認します。

「Kafka connect が終了しました」

エラーメッセージ:

Kafka Connect が終了しました!SAE アプリケーションのエラーログ /opt/kafka/logs/connect.log を確認して、Kafka Connect が終了した原因を特定し、有効な引数でイベントストリームを更新して問題を解決してください。

解決方法: Kafka Connect のステータス更新が遅延することがあります。まず、ページを更新してください。エラーが継続する場合は、ログを確認してください:

  1. タスクの 概要 ページで、ワーカー情報 セクション内の SAE アプリケーション の横にあるアイコンをクリックして、SAE コンソールを開きます。

    Worker Information section with SAE Application link

  2. 基本情報 ページで、インスタンス タブをクリックします。ターゲットインスタンスを特定し、操作 列の Webshell をクリックします。

  3. 起動ログを確認します:

    vi /home/admin/connector-bootstrap.log
  4. ERROR または WARN エントリを含むランタイムログを確認します:

    vi /opt/kafka/logs/connect.log

問題を特定して修正した後、タスクを再開してください。

「コネクタの設定が無効です」

エラーメッセージ:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

ソリューション: エラーメッセージから無効なパラメーターを見つけ、その値を修正します。エラーメッセージでパラメーターが明確に特定できない場合は、Kafka Connect の実行環境にログインし (「Kafka connect が終了しました」をご参照ください)、以下を実行します:

curl -i -X PUT -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  -d @$CONNECTOR_PROPERTIES_MAPPING \
  http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

応答には、各プロパティの検証結果が含まれます。無効な値を持つプロパティには、空でない errors フィールドがあります:

{
    "value": {
        "name": "snapshot.mode",
        "value": null,
        "recommended_values": [
            "never",
            "initial_only",
            "when_needed",
            "initial",
            "schema_only",
            "schema_only_recovery"
        ],
        "errors": [
            "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
        ],
        "visible": true
    }
}

「トピックメタデータのフェッチ中にタイムアウトしました」

エラーメッセージ:

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

ソリューション: このエラーは、必要な送信先トピックが存在しない場合に発生します。コネクタを停止し、不足しているトピック ({database.server.name} および {database.server.name}.{databaseName}.{tableName}) を作成してから、コネクタを再起動してください。

参考文献