この機能は非推奨です。
アプリケーションが MySQL のデータ変更をリアルタイムで可視化する必要がある場合、手動でデータベースをポーリングすると、レイテンシーとオーバーヘッドが発生します。Debezium MySQL Source コネクタは、ApsaraDB RDS for MySQL からのすべての INSERT、UPDATE、DELETE 操作を、構造化された変更データキャプチャ (CDC) イベントとして ApsaraMQ for Kafka にストリーミングします。これにより、ダウンストリームのコンシューマーはほぼリアルタイムで変更を受け取ることができます。
仕組み
Debezium MySQL コネクタは MySQL のバイナリログ (binlog) を読み取り、各行レベルの変更を構造化されたイベントに変換します。Serverless App Engine (SAE) 上でマネージドサービスとして実行される Kafka Connect は、これらのイベントを ApsaraMQ for Kafka の送信先トピックに発行します。
データは 3 つのステージを通過します:
MySQL は、コミットされたすべてのトランザクションを binlog に記録します。
Debezium コネクタは binlog を読み取り、変更イベントを生成します。
Kafka Connect は、各イベントを ApsaraMQ for Kafka の送信先トピックに発行します。
初回起動時、コネクタは既存データの初期スナップショットを取得し、その後、継続的な変更のために binlog の読み取りに切り替わります。
トピックの命名
コネクタは、2 種類の送信先トピックにイベントを発行します。コネクタを起動する前に、両方のトピックを作成してください。
| トピックタイプ | 命名フォーマット | 例 |
|---|---|---|
| データベースレベル (スキーマ変更) | {database.server.name} | fulfillment |
| テーブルレベル (行の変更) | {database.server.name}.{databaseName}.{tableName} | fulfillment.mydb.orders |
database.history.kafka.topic で指定される 3 番目のトピックは、内部スキーマ履歴を保存します。コネクタを起動する前に、このトピックを作成してください。
制限事項
Debezium MySQL コネクタは、1 つのタスク (
tasks.max = 1) のみをサポートします。複数のワーカーでフォールトトレランスを提供することはできますが、タスク自体を並列化することはできません。コネクタのバージョンは Java 8 と互換性がある必要があります。
前提条件
開始する前に、以下のサービスとリソースが準備できていることを確認してください:
ステップ 1:MySQL ソーステーブルの作成
ApsaraDB RDS for MySQL インスタンスをセットアップし、コネクタが監視するテーブルを作成します。
ApsaraDB RDS コンソールにログインし、ApsaraDB RDS for MySQL インスタンスを作成します。
重要ApsaraMQ for Kafka インスタンスと同じ VPC を選択し、VPC CIDR ブロックをインスタンスのホワイトリストに追加してください。

[インスタンス] ページで、作成したインスタンスをクリックします。インスタンスの詳細ページで、以下の操作を完了します。
左側のナビゲーションウィンドウで、[アカウント] をクリックし、「アカウントを作成」をご参照ください。また、既存のアカウントを使用することもできます。
左側のナビゲーションウィンドウで、[データベース] をクリックし、データベースを作成することができます。既存のデータベースを使用することもできます。
左側のナビゲーションウィンドウで、[データベース接続] をクリックし、内部エンドポイント と ポート番号 をメモします。コネクタを設定する際に、これらの値が必要になります。

左側のナビゲーションウィンドウで、[基本情報] をクリックし、[データベースにログオン] をクリックして Data Management (DMS) コンソールを開きます。
DMS の左側のナビゲーションウィンドウで、データベース名をダブルクリックして切り替えます。

[SQLコンソール] タブで、以下の SQL ステートメントを実行してサンプルテーブルを作成します:
CREATE TABLE sql_table(id INT, number INT);
ステップ 2:Kafka Connect コネクタのセットアップ
Debezium プラグインをダウンロードし、OSS にアップロードして、ApsaraMQ for Kafka コンソールでコネクタタスクを設定します。
コネクタプラグインのアップロード
Debezium MySQL CDC Source Connector の ZIP ファイルをダウンロードします。
重要Java 8 と互換性のあるバージョンを選択してください。
前提条件で作成した OSS バケットに ZIP ファイルをアップロードします。
コネクタタスクの作成
ApsaraMQ for Kafka コンソールにログオンします。 [概要] ページの [リソース分布] セクションで、Kafka インスタンスのリージョンを選択します。
左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [統合概要] に移動します。
「メッセージ流入」セクションで、「カスタム Kafka Connect」カードを見つけ、「作成」をクリックします。
「タスク」ページで、[タスクの作成] をクリックし、各ステップを完了します:
ソース
[データプロバイダー] を Apache Kafka Connect に設定し、[次へ] をクリックします。
コネクタ
以下のパラメーターを設定し、[次のステップ] をクリックします。
Kafka Connect プラグイン
| パラメーター | 説明 |
|---|---|
| バケット | Debezium コネクタの ZIP ファイルが含まれている OSS バケットを選択します。 |
| ファイル | アップロードした Debezium MySQL CDC Source Connector ファイルを選択します。 |
ApsaraMQ For Kafka リソース
| パラメーター | 説明 |
|---|---|
| ApsaraMQ for Kafka パラメーター | [Source Connect] を選択します。 |
| ApsaraMQ for Kafka インスタンス | 前提条件で準備した Kafka インスタンスを選択します。 |
| VPC | Kafka インスタンスから自動入力されます。編集はできません。 |
| vSwitch | Kafka インスタンスから自動入力されます。編集はできません。 |
| セキュリティグループ | セキュリティグループを選択します。 |
Kafka Connect プロパティ
[.properties ファイルを ZIP パッケージから解析] の下にある ZIP パッケージから .properties ファイルを選択します(パス: /etc/xxx.properties)。コードエディタで以下のフィールドを更新します:
| プロパティ | 説明 |
|---|---|
tasks.max | タスクの最大数。1 に設定します。 |
database.hostname | ApsaraDB RDS for MySQL インスタンスの内部エンドポイント (ステップ 1 から)。 |
database.port | ApsaraDB RDS for MySQL インスタンスのポート番号 (ステップ 1 から)。 |
database.user | ApsaraDB RDS for MySQL データベースにアクセスするためのアカウント。 |
database.password | ApsaraDB RDS for MySQL データベースにアクセスするためのパスワード。 |
database.server.name | MySQL サーバーの論理名。使用可能な文字は、英字、数字、アンダースコア (_) です。この値は送信先トピック名を決定します (トピックの命名をご参照ください)。 |
database.include.list | ソースデータベース名。カンマで区切ります。 |
table.include.list | ソーステーブル名。{databaseName}.{tableName} 形式で、カンマで区切ります。 |
database.history.kafka.bootstrap.servers | スキーマ履歴を保存する ApsaraMQ for Kafka インスタンスのエンドポイント。コネクタ設定時に選択した Kafka インスタンスを使用するか、別のインスタンスを作成できます。このインスタンスは RDS インスタンスと同じ VPC 内にある必要があり、動的グループ作成が有効になっている必要があります。 |
database.history.kafka.topic | スキーマ変更履歴を保存するトピック。コネクタを起動する前にこのトピックを作成してください。 |
include.schema.changes | true に設定すると、スキーマ変更が {database.server.name} トピックに書き込まれます。 |
コネクタを起動する前に、すべての送信先トピック ({database.server.name} および {database.server.name}.{databaseName}.{tableName}) を作成してください。これらのトピックが存在しない場合、コネクタは TimeoutException で失敗します。
Debezium MySQL コネクタのプロパティの完全なリストについては、Debezium ドキュメントの「Connector properties」をご参照ください。
インスタンス
ワーカーのスケーリングと内部依存関係を設定し、次に、[次のステップ] をクリックします。
ワーカーの種類
| パラメーター | 説明 |
|---|---|
| ワーカーの種類 | ワーカータイプを選択します。 |
| 最小ワーカー数 | ワーカーの最小数。1 以上である必要があります。 |
| 最大ワーカー数 | ワーカーの最大数。50 を超えることはできません。この制限を増やすには、チケットを送信してください。 |
ワーカーの構成
Apache Kafka コネクターワーカーの依存関係を自動的に作成 を選択して、システムが必要な内部トピックとコンシューマーグループを作成できるようにします。システムは、以下のリソースを生成します:
| リソース | 命名フォーマット |
|---|---|
| オフセットトピック | connect-eb-offset-<Task name> |
| 設定トピック | connect-eb-config-<Task name> |
| ステータストピック | connect-eb-status-<Task name> |
| Kafka Connect コンシューマーグループ | connect-eb-cluster-<Task name> |
| ソースコネクタコンシューマーグループ | connector-eb-cluster-<task name>-<connector name> (シンクコネクタにのみ有効) |
実行設定
[ログ配信] を [Log Service へのデータ配信] または [ApsaraMQ For Kafka へのデータ配信] に設定します。
[ロール承認]セクションで、[ロール]ドロップダウンリストからロールを選択します。
重要AliyunSAEFullAccess ポリシーがアタッチされたロールを選択してください。そうしないと、タスクが失敗する可能性があります。
[保存] をクリックします。
タスクのステータスが [実行中] に変更されると、コネクタは動作しています。
コネクタの検証
テストレコードを挿入し、変更イベントが Kafka に表示されるか確認します。
DMS コンソールで、ステップ 1 で作成したテーブルにレコードを挿入します:
INSERT INTO sql_table(id, number) VALUES(123, 20000);「ApsaraMQ for Kafka コンソール」にログインします。[インスタンス] ページで、使用中のインスタンスをクリックします。
左側のナビゲーションウィンドウで、[トピック] をクリックし、目的のトピックを選択して、[メッセージクエリ] タブを開きます。
各トピックのレコードと期待されるメッセージフォーマットを確認します:
データベースレベルのトピック (
{database.server.name}) -- スキーマ変更イベントが含まれます:{ "source": { "version": "1.5.0.Final", "connector": "mysql", "name": "fulfillment", "ts_ms": 1686283675404, "snapshot": "true", "db": "eb-ceshi", "table": "sql_table", "server_id": 0, "file": "mysql-bin.000006", "pos": 188032, "row": 0 }, "databaseName": "eb-cesh", "ddl": "DROP TABLE IF EXISTS sql_table", "tableChanges": [] }テーブルレベルのトピック (
{database.server.name}.{databaseName}.{tableName}) -- 変更前後のデータを含む行変更イベントが含まれます:{ "before": null, "after": { "id": 123, "number": 20000 }, "source": { "version": "1.5.0.Final", "connector": "mysql", "name": "fulfillment", "ts_ms": 1686283675675, "snapshot": "last", "db": "eb-cesh", "table": "sql_table", "server_id": 0, "file": "mysql-bin.000006", "pos": 188032, "row": 0 }, "op": "r", "ts_ms": 1686283675675, "transaction": null }スキーマ履歴トピック (
database.history.kafka.topic) -- DDL 変更レコードが含まれます:{ "source": { "server": "fulfillment" }, "position": { "ts_sec": 1686283675, "file": "mysql-bin.000006", "pos": 188032, "snapshot": true }, "databaseName": "eb-cesh", "ddl": "CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci", "tableChanges": [] }
トラブルシューティング
「すべてのタスクが失敗しました」
エラーメッセージ:
All tasks under connector mongo-source failed, please check the error trace of the task.解決策: タスク詳細ページの [基本情報] セクションで [診断] をクリックして Connector Monitoring ページを開き、タスクの失敗詳細を確認します。
「Kafka connect が終了しました」
エラーメッセージ:
Kafka Connect が終了しました!SAE アプリケーションのエラーログ /opt/kafka/logs/connect.log を確認して、Kafka Connect が終了した原因を特定し、有効な引数でイベントストリームを更新して問題を解決してください。解決方法: Kafka Connect のステータス更新が遅延することがあります。まず、ページを更新してください。エラーが継続する場合は、ログを確認してください:
タスクの 概要 ページで、ワーカー情報 セクション内の SAE アプリケーション の横にあるアイコンをクリックして、SAE コンソールを開きます。

基本情報 ページで、インスタンス タブをクリックします。ターゲットインスタンスを特定し、操作 列の Webshell をクリックします。
起動ログを確認します:
vi /home/admin/connector-bootstrap.logERRORまたはWARNエントリを含むランタイムログを確認します:vi /opt/kafka/logs/connect.log
問題を特定して修正した後、タスクを再開してください。
「コネクタの設定が無効です」
エラーメッセージ:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`ソリューション: エラーメッセージから無効なパラメーターを見つけ、その値を修正します。エラーメッセージでパラメーターが明確に特定できない場合は、Kafka Connect の実行環境にログインし (「Kafka connect が終了しました」をご参照ください)、以下を実行します:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" \
-d @$CONNECTOR_PROPERTIES_MAPPING \
http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate応答には、各プロパティの検証結果が含まれます。無効な値を持つプロパティには、空でない errors フィールドがあります:
{
"value": {
"name": "snapshot.mode",
"value": null,
"recommended_values": [
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors": [
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible": true
}
}「トピックメタデータのフェッチ中にタイムアウトしました」
エラーメッセージ:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadataソリューション: このエラーは、必要な送信先トピックが存在しない場合に発生します。コネクタを停止し、不足しているトピック ({database.server.name} および {database.server.name}.{databaseName}.{tableName}) を作成してから、コネクタを再起動してください。