Realtime Compute for Apache Flink は、AnalyticDB for MySQL をサブスクライブして、データベースの変更データをリアルタイムでキャプチャおよび処理できます。これにより、効率的なデータ同期とストリームコンピューティングが可能になります。このトピックでは、 Flink を使用して AnalyticDB for MySQL のバイナリログをサブスクライブする方法について説明します。
前提条件
AnalyticDB for MySQL クラスターは、Enterprise Edition、Basic Edition、Data Lakehouse Edition、または Data Warehouse Edition in elastic mode です。
AnalyticDB for MySQL クラスターのマイナーバージョンは 3.2.1.0 以降です。
説明AnalyticDB for MySQL クラスターのマイナーバージョンを表示および更新するには、AnalyticDB for MySQL コンソールにログインし、クラスター情報 ページの 構成情報 セクションに移動します。
Flink リアルタイムコンピューティングエンジンは Ververica Runtime (VVR) 8.0.4 以降です。
AnalyticDB for MySQL クラスターとフルマネージド Flink ワークスペースは同じ VPC 内にあります。
Flink ワークスペースのCIDR ブロックをAnalyticDB for MySQLのホワイトリストに追加しました。
制限事項
XUANWU_V2 テーブルではバイナリログ機能を有効にできません。したがって、バイナリログのサブスクリプションを使用して、AnalyticDB for MySQL クラスター内の XUANWU_V2 テーブルでデータ同期やストリームコンピューティングを実行することはできません。
Flink は、AnalyticDB for MySQL のバイナリログを、基本データ型と複雑な JSON データ型に対してのみ処理できます。
Flink は、パーティションテーブルに対する DDL 操作または自動パーティション削除操作に関連する AnalyticDB for MySQL バイナリログ内のレコードを処理しません。
ステップ 1: バイナリログ機能を有効にする
ソース AnalyticDB for MySQL クラスターのテーブルでバイナリログ機能を有効にします。この例では、source_table という名前のテーブルを使用します。
説明AnalyticDB for MySQL のテーブルでのみバイナリログ機能を有効にできます。
テーブル作成時にバイナリログ機能を有効にする
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;既存のテーブルでバイナリログ機能を有効にする
ALTER TABLE source_table BINLOG=true;(オプション) バイナリログの保存期間を変更します。
binlog_ttlパラメーターを変更して、バイナリログの保存期間を変更できます。パラメーターのデフォルト値は 6h です。次のステートメントを実行して、source_table テーブルのバイナリログの保存期間を 1 日に変更します。ALTER TABLE source_table binlog_ttl='1d';binlog_ttlパラメーターは、次のフォーマットの値をサポートしています。ミリ秒: 純粋な数値。たとえば、
60は 60 ミリ秒を指定します。秒: 数値 + s。たとえば、
30sは 30 秒を指定します。時間: 数値 + h。たとえば、
2hは 2 時間を指定します。日: 数値 + d。たとえば、
1dは 1 日を指定します。
説明カーネルバージョンが 3.2.1.9 以降 (バージョン 3.2.1 の場合)、3.2.2.14 以降 (バージョン 3.2.2 の場合)、3.2.3.8 以降 (バージョン 3.2.3 の場合)、3.2.4.4 以降 (バージョン 3.2.4 の場合)、または 3.2.5.1 以降 (バージョン 3.2.5 の場合) のクラスターでは、バイナリログの最大保存期間は 365 日です。前述のバージョンより前のカーネルバージョンのクラスターでは、バイナリログの最大保存期間は 21 日です。
binlog_ttlパラメーターのデフォルト値以上の値にバイナリログの保存期間を設定することをお勧めします。保存期間を小さい値に設定すると、バイナリログが削除され、データ同期が失敗する可能性があります。現在のバイナリログの保存期間をクエリするには、
SHOW CREATE TABLE source_table;ステートメントを実行します。
ステップ 2: AnalyticDB for MySQL コネクタを Flink にアップロードする
コネクタをダウンロードします。
[フルマネージド Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[コネクタ] をクリックします。
[コネクタ] ページで、[カスタムコネクタの作成] をクリックします。
ダウンロードしたコネクタをアップロードし、[次へ] をクリックします。
[完了] をクリックします。カスタムコネクタがコネクタリストに表示されます。
ステップ 3: バイナリログをサブスクライブする
Realtime Compute for Apache Flink コンソールにログインし、SQL ジョブを作成します。
AnalyticDB for MySQL に接続し、指定されたテーブル (source_table) からバイナリログデータを読み取るためのソーステーブルを作成します。
説明Flink DDL ステートメントで定義されたプライマリキーは、キー名を含め、AnalyticDB for MySQL クラスターの物理テーブルのプライマリキーと同一である必要があります。同一でない場合、データの正確性に影響します。
Flink のデータ型は、AnalyticDB for MySQL のデータ型と互換性がある必要があります。詳細については、「型のマッピング」をご参照ください。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );次の表に、WITH 句のパラメーターを示します。
パラメーター
必須
デフォルト値
データの型
説明
connector
はい
なし
STRING
使用するコネクタ。
このパラメーターは必須です。値を
adb-mysql-cdcに設定します。hostname
はい
なし
STRING
AnalyticDB for MySQL の VPC エンドポイント。
username
はい
なし
STRING
AnalyticDB for MySQL データベースアカウント。
password
はい
なし
STRING
AnalyticDB for MySQL データベースアカウントのパスワード。
database-name
はい
なし
STRING
AnalyticDB for MySQL データベースの名前。
AnalyticDB for MySQL はテーブルレベルのバイナリログを実装しているため、指定できるデータベースは 1 つだけです。
table-name
はい
なし
STRING
AnalyticDB for MySQL データベース内のテーブルの名前。
AnalyticDB for MySQL はテーブルレベルのバイナリログを実装しているため、指定できるテーブルは 1 つだけです。
port
いいえ
3306
INTEGER
ポート番号。
scan.incremental.snapshot.enabled
いいえ
true
BOOLEAN
増分スナップショット。
この機能はデフォルトで有効になっています。増分スナップショットは、テーブルスナップショットを読み取るための新しいメカニズムです。従来のスナップショットメカニズムと比較して、増分スナップショットメカニズムには次の利点があります。
スナップショットの読み取り中、ソースは同時読み取りをサポートします。
スナップショットの読み取り中、ソースはチャンク粒度でのチェックポイントをサポートします。
スナップショットの読み取り前に、ソースはデータベースロック権限を取得する必要はありません。
scan.incremental.snapshot.chunk.size
いいえ
8096
INTEGER
テーブルスナップショット チャンクのサイズ。チャンクに含まれる行数です。
増分スナップショット読み取りが有効な場合、テーブルは読み取りのために複数のチャンクに分割されます。
scan.snapshot.fetch.size
いいえ
1024
INTEGER
テーブルスナップショットを 1 回読み取るごとに読み取ることができる最大行数。
scan.startup.mode
いいえ
initial
STRING
データ消費の起動モード。
有効な値:
initial (デフォルト): ジョブが初めて開始されると、すべての既存データをスキャンし、最新のバイナリログデータを読み取ります。
earliest-offset: ジョブは既存データをスキャンせず、利用可能な最も古いバイナリログからデータの読み取りを開始します。
specific-offset: 既存の完全データをスキャンせず、指定したバイナリログオフセットから開始します。このオフセットは、
scan.startup.specific-offset.fileとscan.startup.specific-offset.posの両方のパラメーターを構成して、開始バイナリログファイルとオフセットを定義することで指定できます。latest-offset: ジョブが初めて開始されると、既存データをスキャンせず、バイナリログの末尾 (最新のバイナリログ) からデータの読み取りを開始します。これは、ジョブがコネクタの起動後に発生した最新の変更のみを読み取ることを意味します。
timestamp: 既存の完全データをスキャンしません。コネクタは、指定されたタイムスタンプからバイナリログの読み取りを開始します。タイムスタンプは、
scan.startup.timestamp-millisパラメーターによってミリ秒 (ms) 単位で指定されます。
重要earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定されたバイナリログ消費位置からジョブが開始されるまでの間、対応するテーブルのスキーマが変更されないようにしてください。これにより、スキーマ進化によるジョブの失敗を防ぐことができます。
scan.startup.specific-offset.file
いいえ
なし
STRING
specific-offset 起動モードでは、このパラメーターは開始オフセットのバイナリログファイルの名前を指定します。
最新のバイナリログファイル名を取得するには、
SHOW MASTER STATUS for table_name;ステートメントを実行します。scan.startup.specific-offset.pos
いいえ
なし
LONG
specific-offset 起動モードでは、このパラメーターは開始オフセットのバイナリログファイル内の位置を指定します。
SHOW MASTER STATUS for table_name;コマンドを実行して、最新のバイナリログ位置を取得できます。scan.startup.specific-offset.skip-events
いいえ
なし
LONG
指定された開始オフセットの後にスキップするイベントの数。
scan.startup.specific-offset.skip-rows
いいえ
なし
LONG
指定された開始オフセットの後にスキップするデータ行の数。
scan.startup.timestamp-millis
いいえ
なし
LONG
指定された時間モードを使用してジョブを開始する場合、このパラメーターは開始オフセットをミリ秒単位で指定します。
この構成を使用する場合、
scan.startup.modeは timestamp に設定する必要があります。タイムスタンプはミリ秒 (ms) 単位です。server-time-zone
いいえ
なし
STRING
データベースサーバーのセッションタイムゾーン。
例: "Asia/Shanghai"。このパラメーターは、AnalyticDB for MySQL で、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。このパラメーターが設定されていない場合、
ZONELD.SYSTEMDEFAULT()がサーバーのタイムゾーンを決定するために使用されます。debezium.min.row.count.to.stream.result
いいえ
1000
INTEGER
テーブルの行数がこの値より大きい場合、コネクタは結果をストリーミングします。
このパラメーターを
0に設定すると、すべてのテーブルサイズのチェックがスキップされ、スナップショット中にすべての結果が常にストリーミングされます。connect.timeout
いいえ
30s
DURATION
データベースサーバーへの接続がタイムアウトするまで待機する最大時間。この時間を超えると、システムは接続を再試行します。
デフォルトの単位は秒 (s) です。
connect.max-retries
いいえ
3
INTEGER
データベースサービスへの接続が失敗した後の最大再試行回数。
処理されたデータを格納する宛先テーブルを作成します。この例では、AnalyticDB for MySQL を宛先として使用します。Flink がサポートするコネクタの詳細については、「サポートされているコネクタ」をご参照ください。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )前のステップで作成した宛先テーブルに接続するためのシンクテーブルを作成します。シンクテーブルは、処理されたデータを AnalyticDB for MySQL の指定されたテーブルに書き込みます。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );シンクテーブルの WITH パラメーターと型のマッピングの詳細については、「AnalyticDB for MySQL V3.0 コネクタ」をご参照ください。
キャプチャされたソースデータの変更をシンクテーブルに同期します。その後、シンクテーブルはデータを宛先に書き込みます。
INSERT INTO adb_sink SELECT * FROM adb_source;[保存] をクリックします。
[深度チェック]をクリックします。
検証機能は、ジョブの SQL セマンティクス、ネットワーク接続、および使用するテーブルのメタデータをチェックします。結果エリアで [SQL 最適化] をクリックして、SQL のリスクアラートと最適化の提案を表示することもできます。
(オプション) [デバッグ] をクリックします。
ジョブデバッグ機能を使用して、ジョブの実行をシミュレートし、出力結果を確認し、SELECT または INSERT ステートメントのビジネスロジックを検証できます。これにより、開発効率が向上し、データ品質のリスクが軽減されます。
[デプロイ] をクリックします。
ジョブを開発および検証した後、ジョブを本番環境にデプロイできます。ジョブがデプロイされた後、[O&M] ページに移動して ジョブを開始できます。
(オプション) バイナリログに関する情報を表示します。
説明次の SQL ステートメントを実行してバイナリログに関する情報をクエリした後、バイナリログ機能を有効にしただけの場合は 0 が返されます。ログ情報は、バイナリログをサブスクライブした後にのみ表示されます。
最新のバイナリログのファイル名と場所をクエリするには、次のステートメントを実行します。
SHOW MASTER STATUS FOR source_table;クリアされていないすべての既存バイナリログとそのサイズをクエリするには、次のステートメントを実行します。
SHOW BINARY LOGS FOR source_table;
型のマッピング
次の表に、AnalyticDB for MySQL と Flink の間のデータ型のマッピングを示します。
AnalyticDB for MySQL フィールドの型 | Flink フィールドの型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s) or NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |