Realtime Compute for Apache Flink は、AnalyticDB for MySQL を含む AnalyticDB for MySQL クラスターをサブスクライブし、データベースの変更をリアルタイムでキャプチャして処理します。これにより、効率的なデータ同期とストリームコンピューティングが可能になります。
前提条件
AnalyticDB for MySQL クラスターは、Enterprise Edition、Basic Edition、Data Lakehouse Edition、または エラスティックモードの Data Warehouse Edition です。
AnalyticDB for MySQL クラスターのマイナーバージョンは 3.2.1.0 以降です。
説明AnalyticDB for MySQL クラスターのマイナーバージョンを表示および更新するには、AnalyticDB for MySQL コンソールにログインし、クラスター情報 ページの 構成情報 セクションに移動します。
Flink ワークスペースは Ververica Runtime (VVR) 8.0.4 以降のバージョンを使用する必要があります。
AnalyticDB for MySQL クラスターとフルマネージド Flink ワークスペースは、同じ VPC 内にある必要があります。
Flink ワークスペースのCIDR ブロックを AnalyticDB for MySQL のホワイトリストに追加します。
制限事項
Flink は、AnalyticDB for MySQL のバイナリログからは、基本データ型とJSON 複合データ型しか処理できません。
Flink は、AnalyticDB for MySQL のバイナリログから、パーティションテーブルの DDL 操作と自動パーティション削除レコードを無視します。
ステップ1:バイナリロギングの有効化
ソース AnalyticDB for MySQL クラスターのテーブルでバイナリログ機能を有効にします。この例では、`source_table` という名前のテーブルを使用します。
説明バイナリログ機能は、AnalyticDB for MySQL のテーブルでのみ有効にできます。
テーブル作成時にバイナリログ機能を有効にする
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;既存のテーブルでバイナリログ機能を有効にする
ALTER TABLE source_table BINLOG=true;(オプション) バイナリログの保持期間を変更します。
binlog_ttlパラメーターを変更して、バイナリログの保持期間を変更できます。パラメーターのデフォルト値は 6h です。次の文を実行して、`source_table` テーブルのバイナリログの保持期間を 1 日に変更します。ALTER TABLE source_table binlog_ttl='1d';binlog_ttlパラメーターは、次のフォーマットの値をサポートします。ミリ秒: 純粋な数値。たとえば、
60は 60 ミリ秒を指定します。秒: 数値 + s。たとえば、
30sは 30 秒を指定します。時間: 数値 + h。たとえば、
2hは 2 時間を指定します。日: 数値 + d。たとえば、
1dは 1 日を指定します。
説明カーネルバージョンが 3.2.1.9 以降 (バージョン 3.2.1 の場合)、3.2.2.14 以降 (バージョン 3.2.2 の場合)、3.2.3.8 以降 (バージョン 3.2.3 の場合)、3.2.4.4 以降 (バージョン 3.2.4 の場合)、または 3.2.5.1 以降 (バージョン 3.2.5 の場合) のクラスターでは、バイナリログの最大保持期間は 365 日です。前述のバージョンより前のカーネルバージョンを持つクラスターの場合、バイナリログの最大保持期間は 21 日です。
バイナリログの保持期間は、
binlog_ttlパラメーターのデフォルト値以上に設定することをお勧めします。保持期間を小さい値に設定すると、バイナリログが削除され、データ同期が失敗する可能性があります。バイナリログの現在の保持期間をクエリするには、
SHOW CREATE TABLE source_table;文を実行します。
ステップ 2: AnalyticDB for MySQL コネクターのアップロード
コネクタの JAR ファイルをダウンロードします。flink-sql-connector-adb-mysql-cdc-2.4-20260420.jar
[Flink] タブで、対象のワークスペースを見つけ、操作 列の [Console] をクリックします。
左側のナビゲーションペインで、[Connectors] をクリックします。
[Connectors] ページで、[Create Custom Connector] をクリックします。
ダウンロードしたコネクタをアップロードし、次へ をクリックします。
完了 をクリックします。作成されたカスタムコネクタがコネクタのリストに表示されます。
ステップ3:バイナリログのサブスクライブ
Realtime Compute for Apache Flink コンソールにログインし、SQL ジョブを作成します。
AnalyticDB for MySQL に接続し、特定のテーブル (
source_table) からバイナリログデータを読み取るソーステーブルを作成します。説明Flink DDL で定義されたプライマリキーは、キー列とプライマリキー名の両方において、AnalyticDB for MySQL クラスターの物理テーブルにあるプライマリキーと一致する必要があります。そうでない場合、データが不正確になる可能性があります。
Flink のデータ型は、AnalyticDB for MySQL のデータ型と互換性がある必要があります。データ型のマッピングについては、「型マッピング」をご参照ください。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );次の表に、WITH パラメーターを示します。
パラメーター
必須
デフォルト
型
説明
connector
はい
なし
STRING
使用するコネクタ。
これはカスタムコネクタです。このパラメーターを
adb-mysql-cdcに設定します。hostname
はい
なし
STRING
AnalyticDB for MySQL クラスターのエンドポイント。
username
はい
なし
STRING
AnalyticDB for MySQL クラスターのデータベースアカウント。
password
はい
なし
STRING
AnalyticDB for MySQL データベースアカウントのパスワード。
database-name
はい
なし
STRING
AnalyticDB for MySQL データベースの名前。
AnalyticDB for MySQL はテーブルレベルのバイナリロギングを実装しているため、データベースは 1 つしか指定できません。
table-name
はい
なし
STRING
AnalyticDB for MySQL データベースのテーブル名。
AnalyticDB for MySQL はテーブルレベルのバイナリロギングを実装しているため、指定できるテーブルは 1 つだけです。
port
いいえ
3306
INTEGER
ポート番号。
scan.incremental.snapshot.enabled
いいえ
true
BOOLEAN
増分スナップショット読み取りメカニズムを有効にするかどうかを指定します。
この機能はデフォルトで有効になっています。増分スナップショットは、テーブルスナップショットを読み取るための新しいメカニズムです。以前のスナップショットメカニズムと比較して、増分スナップショットには次の利点があります。
ソースはスナップショット読み取り中に同時読み取りをサポートします。
ソースはスナップショット読み取り中にチャンクレベルのチェックポイントをサポートします。
ソースはスナップショットを読み取る前にデータベースのロック権限を取得する必要がありません。
scan.incremental.snapshot.chunk.size
いいえ
8096
INTEGER
テーブルスナップショットのチャンクあたりの行数。
scan.snapshot.fetch.size
いいえ
1024
INTEGER
テーブルスナップショットの読み取り時に一度にフェッチする最大行数。
scan.startup.mode
いいえ
initial
STRING
データ消費の起動モード。
有効な値:
initial (デフォルト):テーブルの初期スナップショットを実行し、最新のバイナリログを読み取ります。
earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古いバイナリログから読み取りを開始します。
specific-offset: スナップショットフェーズをスキップし、指定されたバイナリログの位置から開始します。
scan.startup.specific-offset.fileとscan.startup.specific-offset.posの両方のパラメーターを設定して、バイナリログのファイル名とオフセットを指定します。latest-offset:スナップショットフェーズをスキップし、コネクタの起動後に発生した変更のみを読み取ります。
タイムスタンプ: スナップショットフェーズをスキップし、指定されたタイムスタンプから読み取りを開始します。
scan.startup.timestamp-millisパラメーターを使用して、タイムスタンプをミリ秒 (ms) 単位で設定します。
重要earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定された開始位置とジョブの起動の間でテーブルスキーマが変更されないようにしてください。変更されると、スキーマの変更が原因でジョブが失敗する可能性があります。
scan.startup.specific-offset.file
いいえ
なし
STRING
specific-offset 起動モードでの、開始位置のバイナリログファイル名。
最新のバイナリログファイル名を取得するには、
SHOW MASTER STATUS for table_name;ステートメントを実行します。scan.startup.specific-offset.pos
いいえ
なし
LONG
specific-offset 起動モードでの、開始位置のバイナリログファイル内の位置。
最新のバイナリログの位置を取得するには、
SHOW MASTER STATUS for table_name;ステートメントを実行します。scan.startup.specific-offset.skip-events
いいえ
なし
LONG
指定された開始位置の後にスキップするイベントの数。
scan.startup.specific-offset.skip-rows
いいえ
なし
LONG
指定された開始位置の後にスキップするデータ行の数。
scan.startup.timestamp-millis
いいえ
なし
LONG
timestamp 起動モードを使用する場合の、開始位置のミリ秒単位のタイムスタンプ。
このパラメーターを使用する場合、
scan.startup.modeを timestamp に設定する必要があります。タイムスタンプはミリ秒 (ms) です。server-time-zone
いいえ
システムデフォルト
STRING
データベースサーバーのセッションタイムゾーン。 例:「Asia/Shanghai」。
これは、AnalyticDB for MySQLAnalyticDB for MySQL が
STRINGデータ型に変換される方法を制御します。このパラメーターが設定されていない場合、ZONELD.SYSTEMDEFAULT()を使用してサーバーのタイムゾーンが決定されます。debezium.min.row.count.to.stream.result
いいえ
1000
INTEGER
テーブルの行数がこの値より大きい場合、コネクタは結果をストリーミングします。
このパラメーターを
0に設定すると、すべてのテーブルサイズのチェックがスキップされ、スナップショットフェーズ中にすべての結果がストリーミングされます。connect.timeout
いいえ
30s
DURATION
データベース接続の試行がタイムアウトするまでの最大待機時間。
デフォルトの単位は秒 (s) です。
connect.max-retries
いいえ
3
INTEGER
データベース接続の失敗後の最大再試行回数。
宛先データベースに、処理済みのデータを格納するための物理テーブルを作成します。 このトピックでは、宛先として AnalyticDB for MySQL を使用します。 Flink でサポートされているコネクターについては、「サポートされているコネクター」をご参照ください。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )前の手順で作成したテーブルに接続するシンクテーブルを作成します。 シンクテーブルは、処理済みのデータを AnalyticDB for MySQL の指定されたテーブルに書き込みます。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );シンクテーブルの WITH パラメーターと型マッピングの詳細については、「AnalyticDB for MySQL V3.0 コネクタ」をご参照ください。
INSERT INTO ステートメントを使用して、ソーステーブルからシンクテーブルにデータを送信します。
INSERT INTO adb_sink SELECT * FROM adb_source;保存 をクリックします。
[Validate] をクリックします。
検証機能は、SQL のセマンティクス、ネットワーク接続性、およびジョブで使用されるテーブルのメタデータをチェックします。結果エリアで [SQL Advice] をクリックして、SQL のリスクプロンプトや最適化の提案を表示することもできます。
(オプション) Debug をクリックします。
ジョブデバッグ機能を使用して、ジョブの実行をシミュレートし、出力結果を確認し、SELECT または INSERT ステートメントのビジネスロジックを検証し、開発効率を向上させ、データ品質のリスクを低減できます。
Deploy をクリックします。
ジョブを開発および検証した後、本番環境にデプロイします。その後、[O&M] ページに移動し、ジョブを開始します。
(オプション) バイナリログに関する情報を表示します。
説明次の SQL 文を実行してバイナリログに関する情報をクエリした後、バイナリログ機能を有効にしただけの場合は 0 が返されます。ログ情報は、バイナリログをサブスクライブした後にのみ表示されます。
最新のバイナリログのファイル名と場所をクエリするには、次の文を実行します。
SHOW MASTER STATUS FOR source_table;クリアされていないすべての履歴バイナリログとそのサイズをクエリするには、次の文を実行します。
SHOW BINARY LOGS FOR source_table;
型マッピング
次の表では、AnalyticDB for MySQL のデータ型と、それに対応する Flink のデータ型との対応を示します。
AnalyticDB for MySQL タイプ | Flink タイプ |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) or NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |