このトピックでは、MySQL コネクタの使用方法について説明します。
背景情報
MySQL コネクタは、MySQL プロトコルと互換性のあるすべてのデータベース(ApsaraDB RDS for MySQL、PolarDB for MySQL、OceanBase(MySQL モード)、および自己管理 MySQL データベース)をサポートしています。
OceanBase から MySQL コネクタでデータを読み取る場合、OceanBase でバイナリログが有効化され、適切に設定されていることを確認してください。詳細については、「バイナリロギング操作」をご参照ください。本機能はパブリックプレビュー版です。本番環境への導入前に、その適合性を評価し、慎重にご利用ください。
MySQL コネクタは以下の機能をサポートしています。
カテゴリ | 詳細 |
対応タイプ | ソーステーブル、ディメンションテーブル、および結果テーブル。また、データソースからのデータ取り込みもサポートします。 |
実行モード | ストリーミングモードのみ |
データ形式 | 該当なし |
カスタム監視メトリクス | |
API 種別 | DataStream、SQL、およびデータインジェスト YAML |
結果テーブルのデータ更新または削除のサポート | はい |
主な機能
MySQL CDC ソーステーブルは、まずデータベースから履歴の完全データを読み取り、その後シームレスにバイナリログの読み取りに切り替えます。これにより、データの欠落や重複が発生しません。障害が発生した場合でも、データ処理は 1 回限りのセマンティクスを保証します。MySQL CDC ソーステーブルは、完全データの並列読み取りをサポートし、ロックレス操作と再開可能な転送を実現するための増分スナップショットアルゴリズムを使用します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。
ストリーミング処理とバッチ処理の統合。2 つの別個のワークフローを維持することなく、完全データと増分データの両方を読み取ります。
水平方向の拡張性を実現するための完全データの並列読み取り。
完全データ読み取りから増分読み取りへのシームレスな切り替え。計算リソースを節約するために自動的にスケールインします。
安定性向上のための完全データ読み取り中の再開可能な転送。
オンラインビジネスへの影響を回避するための完全データのロックレス読み取り。
ApsaraDB RDS for MySQL のバックアップログの読み取りをサポート。
レイテンシー低減のためのバイナリログファイルの並列解析。
前提条件
MySQL CDC ソーステーブルを使用する前に、「MySQL の設定」の手順を完了してください。これらの手順により、MySQL CDC ソーステーブルの使用に必要な前提条件が満たされます。
ApsaraDB RDS for MySQL
ApsaraDB RDS for MySQL と Realtime Compute for Apache Flink 間のネットワーク接続テストを実行し、ネットワーク接続を確認します。
MySQL のバージョン要件:5.6、5.7、または 8.0.x。
バイナリログの有効化。(デフォルトで有効化されています。)
バイナリログ形式を ROW に設定。(ROW がデフォルト形式です。)
binlog_row_image を FULL に設定。(FULL がデフォルト設定です。)
バイナリログトランザクション圧縮を無効化。(MySQL 8.0.20 以降で導入。デフォルトで無効化されています。)
SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を持つ MySQL ユーザーを作成済みである。
MySQL データベースとテーブルを作成します。詳細については、「ApsaraDB RDS for MySQL のデータベースとアカウントを作成する」をご参照ください。権限不足による失敗を防ぐため、特権アカウントを使用してデータベースを作成してください。
IP アドレスホワイトリストを設定します。詳細については、「ApsaraDB RDS for MySQL の IP アドレスホワイトリストを設定する」をご参照ください。
PolarDB for MySQL
Realtime Compute for Apache Flink に対して接続テストを実行し、ネットワーク接続を確認します。
MySQL のバージョン要件:5.6、5.7、または 8.0.x。
バイナリログの有効化。(デフォルトでは無効化されています。)
バイナリログ形式を ROW に設定。(ROW がデフォルト形式です。)
binlog_row_image を FULL に設定。(FULL がデフォルト設定です。)
バイナリログトランザクション圧縮を無効化。(MySQL 8.0.20 以降で導入。デフォルトで無効化されています。)
SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を持つ MySQL ユーザーを作成済みである。
MySQL データベースとテーブルを作成します。詳細については、「PolarDB for MySQL のデータベースおよびアカウントの作成」をご参照ください。権限不足による失敗を防ぐため、特権アカウントを使用してデータベースを作成します。
IP アドレスホワイトリストを設定します。詳細については、「PolarDB for MySQL の IP アドレスホワイトリストを設定する」をご参照ください。
自己管理 MySQL
Realtime Compute for Apache Flink に対して接続テストを実行し、ネットワーク接続を確認します。
MySQL のバージョン要件:5.6、5.7、または 8.0.x。
バイナリログの有効化。(デフォルトでは無効化されています。)
バイナリログ形式を ROW に設定。(STATEMENT がデフォルト形式です。)
binlog_row_image を FULL に設定。(FULL がデフォルト設定です。)
バイナリログトランザクション圧縮を無効化。(MySQL 8.0.20 以降で導入。デフォルトで無効化されています。)
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与済みである。
MySQL データベースとテーブルを作成します。詳細については、「自己管理 MySQL データベース用にデータベースとアカウントを作成する」をご参照ください。権限不足による失敗を防ぐため、特権アカウントを使用してデータベースを作成してください。
IP アドレスホワイトリストを設定します。詳細については、「自己管理 MySQL データベースの IP アドレスホワイトリストを設定する」をご参照ください。
使用制限
一般的な制限事項
MySQL CDC ソーステーブルは、現在 Watermark の定義をサポートしていません。
CTAS および CDAS ジョブでは、MySQL CDC ソーステーブルが一部のスキーマ変更を同期できます。サポートされる変更タイプの詳細については、「スキーマ進化同期ポリシー」をご参照ください。
MySQL CDC コネクタは、バイナリログトランザクション圧縮をサポートしていません。増分データを消費する際は、この機能が無効化されていることを確認してください。そうでない場合、増分データの読み込みに失敗する可能性があります。
ApsaraDB RDS for MySQL の制限事項
ApsaraDB RDS for MySQL のスタンバイインスタンスまたは読み取り専用インスタンスからデータを読み取らないでください。これらのインスタンスのバイナリログは、デフォルトで短期間のみ保持されます。バイナリログが期限切れになってクリーンアップされると、ジョブがそれらを消費できなくなる可能性があります。
ApsaraDB RDS for MySQL では、パラレルレプリケーションがデフォルトで有効化されていますが、プライマリインスタンスとセカンダリインスタンス間のトランザクション順序の一貫性は保証されません。プライマリ/セカンダリ スイッチオーバー後、チェックポイント復旧時に一部のデータがスキップされる可能性があります。これを回避するには、ApsaraDB RDS for MySQL で slave_preserve_commit_order オプションを有効化してください。
PolarDB for MySQL の制限事項
MySQL CDC ソーステーブルは、PolarDB for MySQL バージョン 1.0.19 以前のマルチマスターコンフィグレーションクラスターからバイナリログを読み取ることをサポートしていません。(マルチマスターコンフィグレーションとは?)これらのクラスターのバイナリログには、重複するテーブル ID が含まれている場合があり、スキーママッピングエラーおよび解析失敗を引き起こす可能性があります。
オープンソース MySQL の制限事項
デフォルトでは、MySQL はプライマリレプリカのバイナリログレプリケーション中にトランザクション順序を維持します。MySQL レプリカでパラレルレプリケーションが有効化されている場合(slave_parallel_workers > 1)、slave_preserve_commit_order = ON が有効化されていないと、トランザクションコミット順序がプライマリデータベースと一致しない可能性があります。この順序外のコミット動作により、Flink CDC がチェックポイントから再開する際にデータ損失が発生する可能性があります。この問題を防止するには、MySQL レプリカで slave_preserve_commit_order = ON を設定することを推奨します。あるいは、slave_parallel_workers = 1 と設定することもできますが、これはレプリケーション性能を犠牲にします。
注意事項
シンクテーブル
自動インクリメント主キーは DDL で宣言されません。MySQL がデータ書き込み時に自動的に埋めます。
少なくとも 1 つの非主キーフィールドを宣言する必要があります。そうでない場合、エラーが発生します。
DDL 内の NOT ENFORCED は、Flink が主キー検証を強制しないことを意味します。主キーの正確性および整合性を保証する必要があります。詳細については、「妥当性チェック」をご参照ください。
ディメンションテーブル
インデックスを使用したクエリの高速化のため、JOIN 条件のフィールドはインデックス定義の順序(左端プレフィックスルール)と一致している必要があります。たとえば、インデックスが (a, b, c) の場合、JOIN 条件は
ON t.a = x AND t.b = yである必要があります。Flink が生成する SQL は、オプティマイザーによって書き換えられる可能性があり、インデックスが使用されないことがあります。インデックスが実際に使用されているかどうかを確認するには、MySQL の実行計画(EXPLAIN)またはスロークエリログを確認してください。
SQL
MySQL コネクタは、SQL ジョブでソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。
構文
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);コネクタが結果テーブルに書き込む方法:各受信レコードは INSERT 文に変換され、実行されます。正確な SQL はシナリオによって異なります:
主キーがない結果テーブルの場合、コネクタは
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);を実行します。主キーがある結果テーブルの場合、コネクタは
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;を実行します。注:物理テーブルに主キー以外の一意なインデックス制約がある場合、異なる主キーだが同一の一意なインデックス値を持つ 2 つのレコードを挿入すると、競合によりデータ損失が発生する可能性があります。
MySQL データベースで自動インクリメント主キーが定義されている場合、Flink DDL ではそれを宣言しないでください。MySQL がデータ書き込み時にこのフィールドを自動的に埋めます。コネクタは自動インクリメントフィールドを含むデータの書き込みおよび削除をサポートしますが、更新はサポートしません。
WITH パラメーター
共通パラメーター
パラメーター
説明
必須
データ型
デフォルト値
備考
connector
テーブル種別。
はい
STRING
該当なし
ソーステーブルとして使用する場合、
mysql-cdcまたはmysqlを指定できます。どちらも同等です。ディメンションテーブルまたは結果テーブルとして使用する場合、値は固定でmysqlとなります。hostname
MySQL データベースの IP アドレスまたはホスト名。
はい
STRING
該当なし
VPC エンドポイントを指定することを推奨します。
説明MySQL と Realtime Compute for Apache Flink が同じ VPC 内にない場合、クロス VPC 接続を確立するか、パブリックエンドポイントを使用してください。詳細については、「ストレージ管理および運用」および「完全管理 Flink クラスターからインターネットへのアクセス方法」をご参照ください。
username
MySQL データベースサービスのユーザー名。
はい
STRING
該当なし
該当なし。
password
MySQL データベースサービスのパスワード。
はい
STRING
該当なし
該当なし。
database-name
MySQL データベースの名前。
はい
STRING
該当なし
ソーステーブルとして使用する場合、データベース名には正規表現を使用して複数のデータベースからデータを読み取ることが可能です。
正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ 文字は使用しないでください。table-name の備考を参照してください。
table-name
MySQL テーブルの名前。
はい
STRING
該当なし
ソーステーブルとして使用する場合、テーブル名には正規表現を使用して複数のテーブルからデータを読み取ることが可能です。
複数の MySQL テーブルを読み取る場合、複数の CTAS ステートメントを単一のジョブとして送信することで、複数のバイナリログリスナーを起動せずに済みます。これにより、パフォーマンスと効率が向上します。詳細については、「複数の CTAS ステートメント:単一のジョブとして送信」をご参照ください。
正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ 文字は使用しないでください。詳細については、以下の備考を参照してください。
説明正規表現でテーブル名をマッチさせる場合、MySQL CDC ソーステーブルは database-name および table-name の値を文字列 \\.(VVR 8.0.1 以前の場合は .)で連結し、完全パスの正規表現を作成します。その後、この正規表現を MySQL の完全修飾テーブル名に対してマッチさせます。
たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' を設定した場合、コネクタは正規表現 db_.*\\.tb_.+(VVR 8.0.1 以前の場合は db_.*.tb_.+)を使用して、読み取るテーブルを決定します。
port
MySQL データベースサービスのポート番号。
いいえ
INTEGER
3306
該当なし。
ソーステーブル専用
パラメーター
説明
必須
データ型
デフォルト値
備考
server-id
データベースクライアントの数値 ID。
いいえ
STRING
デフォルトでは、5400 ~ 6400 の間のランダムな値が生成されます。
この ID は MySQL クラスター全体で一意である必要があります。同じデータベースにアクセスする各ジョブには、異なる ID を割り当てることを推奨します。
このパラメーターは範囲形式(例:5400-5408)もサポートします。増分読み取りが有効化されている場合、複数の同時リーダーがサポートされます。この場合、各リーダーが一意の ID を使用できるように、範囲を指定する必要があります。詳細については、「server ID の使用」をご参照ください。
scan.incremental.snapshot.enabled
増分スナップショットを有効化するかどうか。
いいえ
BOOLEAN
true
増分スナップショットはデフォルトで有効化されています。増分スナップショットは、完全データスナップショットを読み取るための新しいメカニズムです。従来のスナップショットと比較して、以下のような利点があります:
完全データの同時読み取り。
完全データ読み取り中のチャンクレベルでのチェックポイント。
完全データ読み取り中にグローバル読み取りロック(FLUSH TABLES WITH READ LOCK)は不要です。
ソースが同時読み取りをサポートするようにするには、各リーダーに一意の server ID が必要です。したがって、server-id は 5400-6400 のような範囲である必要があり、範囲サイズは並列処理の次数以上である必要があります。
説明この構成は VVR 11.1 以降で削除されました。
scan.incremental.snapshot.chunk.size
チャンクあたりの行数。
いいえ
INTEGER
8096
増分スナップショットが有効化されている場合、テーブルは読み取り用にチャンクに分割されます。各チャンクのデータは、チャンクが完全に読み取られるまでメモリにキャッシュされます。
チャンクサイズを小さくすると、チャンクの総数が増えます。これにより回復の粒度は低下しますが、メモリ不足(OOM)エラーが発生したり、全体的なスループットが低下したりする可能性があります。チャンクサイズを選択する際は、これらのトレードオフを考慮する必要があります。
scan.snapshot.fetch.size
完全テーブルデータをスキャンする際、1 回の読み取りで取得する最大レコード数。
いいえ
INTEGER
1024
該当なし。
scan.startup.mode
データ消費の起動モード。
いいえ
STRING
initial
有効な値:
initial(デフォルト):初回起動時に、履歴の完全データをスキャンし、その後最新のバイナリログデータを読み取ります。
latest-offset:初回起動時に、完全データのスキャンをスキップし、バイナリログの末尾(最も新しい位置)から読み取りを開始します。コネクタの起動後に発生した変更のみを読み取ります。
earliest-offset:完全データのスキャンをスキップし、利用可能な最も古いバイナリログ位置から読み取りを開始します。
specific-offset:完全データのスキャンをスキップし、指定されたバイナリログオフセットから読み取りを開始します。オフセットは scan.startup.specific-offset.file および scan.startup.specific-offset.pos の両方を指定するか、scan.startup.specific-offset.gtid-set を使用して GTID セットから開始します。
timestamp:完全データのスキャンをスキップし、指定されたタイムスタンプから読み取りを開始します。scan.startup.timestamp-millis を使用して、ミリ秒単位でタイムスタンプを指定します。
重要earliest-offset、specific-offset、または timestamp を使用する場合、指定されたバイナリログ位置とジョブ起動時の間にテーブルスキーマが変更されていないことを確認してください。スキーマ変更によりエラーが発生する可能性があります。
scan.startup.specific-offset.file
指定された起動オフセットのバイナリログファイル名。
いいえ
STRING
該当なし
この構成を使用する場合、scan.startup.mode を specific-offset に設定する必要があります。ファイル名の例:
mysql-bin.000003。scan.startup.specific-offset.pos
指定されたバイナリログファイル内の起動位置のオフセット。
いいえ
INTEGER
該当なし
この構成を使用する場合、scan.startup.mode を specific-offset に設定する必要があります。
scan.startup.specific-offset.gtid-set
起動位置の GTID セット。
いいえ
STRING
該当なし
この構成を使用する場合、scan.startup.mode を specific-offset に設定する必要があります。GTID セットの例:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
ミリ秒単位の起動タイムスタンプ。
いいえ
LONG
該当なし
この構成を使用する場合、scan.startup.mode を timestamp に設定する必要があります。タイムスタンプの単位はミリ秒です。
重要タイムスタンプを使用する場合、MySQL CDC は各バイナリログファイルの初期イベントを読み取ってそのタイムスタンプを判定し、対応するバイナリログファイルを特定しようと試みます。指定されたタイムスタンプに対応するバイナリログファイルがデータベースに存在し、読み取り可能であることを確認してください。
server-time-zone
データベースで使用されるセッションタイムゾーン。
いいえ
STRING
このパラメーターを指定しない場合、システムは Flink ジョブの実行環境(選択したゾーンのタイムゾーン)のタイムゾーンをデータベースサーバータイムゾーンとして使用します。
例:Asia/Shanghai。このパラメーターは、MySQL の TIMESTAMP 型を STRING 型に変換する方法を制御します。詳細については、「Debezium temporal values」をご参照ください。
debezium.min.row.count.to.stream.results
テーブルの行数がこの値を超える場合、バッチ読み取りモードを使用します。
いいえ
INTEGER
1000
Flink は MySQL ソーステーブルから次のようにデータを読み取ります:
完全読み取り:テーブル全体をメモリに読み込みます。高速ですが、メモリ使用量が多くなります。大規模なテーブルではメモリ不足(OOM)エラーが発生するリスクがあります。
バッチ読み取り:データをバッチ単位で読み込みます。メモリ効率は良いですが、大規模なテーブルでは遅くなります。
connect.timeout
MySQL データベースサーバーへの接続タイムアウト後に再試行するまでの最大待機時間。
いいえ
DURATION
30 秒
該当なし。
connect.max-retries
MySQL データベースサービスへの接続失敗後の最大再試行回数。
いいえ
INTEGER
3
該当なし。
connection.pool.size
データベース接続プールのサイズ。
いいえ
INTEGER
20
データベース接続プールは、データベース接続数を削減するために接続を再利用します。
jdbc.properties.*
JDBC URL のカスタム接続パラメーター。
いいえ
STRING
該当なし
カスタム接続パラメーターを渡すことができます。たとえば、SSL を無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。
サポートされる接続パラメーターについては、「MySQL Configuration Properties」をご参照ください。
debezium.*
バイナリログを読み取るためのカスタム Debezium パラメーター。
いいえ
STRING
該当なし
カスタム Debezium パラメーターを渡すことができます。たとえば、解析エラーの処理方法を定義するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。
heartbeat.interval
ソースでハートビートイベントを使用してバイナリログオフセットを進める間隔。
いいえ
DURATION
30 秒
ハートビートイベントは、ソースでバイナリログオフセットを進めるために使用されます。これは、更新頻度が低いテーブルに役立ちます。ハートビートがない場合、バイナリログオフセットが停止し、期限切れとなってジョブが失敗する可能性があります。ハートビートにより、この問題を防止できます。
scan.incremental.snapshot.chunk.key-column
スナップショットフェーズ中にチャンク分割に使用するカラムを指定します。
備考を参照。
STRING
該当なし
主キーがないテーブルには必須です。選択したカラムは NULL 不可(NOT NULL)である必要があります。
主キーがあるテーブルには任意です。主キーのカラムのみがサポートされます。
rds.region-id
Alibaba Cloud ApsaraDB RDS for MySQL インスタンスのリージョン ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
該当なし
リージョン ID の詳細については、「リージョンおよびゾーン」をご参照ください。
rds.access-key-id
Alibaba Cloud ApsaraDB RDS for MySQL アカウントの AccessKey ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
該当なし
詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、AccessKey ID の設定にはキー管理を使用することを推奨します。詳細については、「変数管理」をご参照ください。
rds.access-key-secret
Alibaba Cloud ApsaraDB RDS for MySQL アカウントの AccessKey Secret。
OSS からアーカイブログを読み取る場合に必須です。
STRING
該当なし
詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、AccessKey Secret の設定にはキー管理を使用することを推奨します。詳細については、「変数管理」をご参照ください。
rds.db-instance-id
Alibaba Cloud ApsaraDB RDS for MySQL インスタンスのインスタンス ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
該当なし
該当なし。
rds.main-db-id
Alibaba Cloud ApsaraDB RDS for MySQL インスタンスのプライマリデータベース ID。
いいえ
STRING
該当なし
プライマリデータベース ID の取得手順については、「ApsaraDB RDS for MySQL ログバックアップ」をご参照ください。
VVR 8.0.7 以降でのみサポートされます。
rds.download.timeout
OSS から単一のアーカイブログをダウンロードする際のタイムアウト。
いいえ
DURATION
60 秒
該当なし。
rds.endpoint
OSS バイナリログ情報を取得するためのエンドポイント。
いいえ
STRING
該当なし
有効な値については、「サービスエンドポイント」をご参照ください。
VVR 8.0.8 以降でのみサポートされます。
scan.incremental.close-idle-reader.enabled
スナップショット完了後にアイドルリーダーを閉じるかどうか。
いいえ
BOOLEAN
false
VVR 8.0.1 以降でのみサポートされます。
この設定では、execution.checkpointing.checkpoints-after-tasks-finish.enabled が true である必要があります。
scan.read-changelog-as-append-only.enabled
変更ログデータストリームを追加専用データストリームに変換するかどうか。
いいえ
BOOLEAN
false
有効な値:
true:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER のすべてのメッセージタイプが INSERT メッセージに変換されます。上流テーブルの削除メッセージを保持するなどの特殊なケースでのみ使用してください。
false(デフォルト):すべてのメッセージタイプがそのまま通過します。
説明VVR 8.0.8 以降でのみサポートされます。
scan.only.deserialize.captured.tables.changelog.enabled
増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうか。
いいえ
BOOLEAN
VVR 8.x ではデフォルトは false です。
VVR 11.1 以降ではデフォルトは true です。
有効な値:
true:ターゲットテーブルの変更データのみを逆シリアル化して、バイナリログの読み取りを高速化します。
false(デフォルト):すべてのテーブルの変更データを逆シリアル化します。
説明VVR 8.0.7 以降でのみサポートされます。
VVR 8.0.8 以前では、debezium.scan.only.deserialize.captured.tables.changelog.enable を使用します。
scan.parse.online.schema.changes.enabled
増分フェーズ中に、RDS ロックレス DDL 変更イベントの解析を試行するかどうか。
いいえ
BOOLEAN
false
有効な値:
true:RDS ロックレス DDL 変更イベントを解析します。
false(デフォルト):RDS ロックレス DDL 変更イベントを解析しません。
実験的な機能です。オンラインロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得してください。
説明VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.backfill.skip
スナップショット読み取り中にバックフィルをスキップするかどうか。
いいえ
BOOLEAN
false
有効な値:
true:スナップショット読み取り中にバックフィルをスキップします。
false(デフォルト):スナップショット読み取り中にバックフィルをスキップしません。
バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が、スナップショットにマージされるのではなく、後続の増分フェーズで読み取られます。
重要バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再再生されるため、データの不整合が発生する可能性があります。これは at-least-once セマンティクスのみを保証します。
説明VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.unbounded-chunk-first.enabled
スナップショット読み取り中に、上限なしのチャンクを最初に配布するかどうか。
いいえ
BOOELEAN
false
有効な値:
true:スナップショット読み取り中に、上限なしのチャンクを最初に配布します。
false(デフォルト):スナップショット読み取り中に、上限なしのチャンクを最初に配布しません。
実験的な機能です。これを有効化すると、スナップショットフェーズの最終チャンク同期時に TaskManager でメモリ不足(OOM)エラーが発生するリスクが低減されます。初回ジョブ起動前に有効化することを推奨します。
説明VVR 11.1 以降でのみサポートされます。
binlog.session.network.timeout
バイナリログ接続のネットワークタイムアウト。
いいえ
DURATION
10 分
これを 0 秒に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。
説明VVR 11.5 以降でのみサポートされます。
scan.rate-limit.records-per-second
ソースが 1 秒間に送信するレコードの最大数を制限します。
いいえ
LONG
該当なし
データ消費を制限する場合に便利です。完全フェーズおよび増分フェーズの両方に適用されます。
numRecordsOutPerSecondメトリクスは、データストリーム全体が 1 秒間に送信するレコード数を反映します。このメトリクスに基づいて、このパラメーターを調整してください。完全読み取り中は、この制限を補完するためにバッチサイズを小さくする必要があります。
scan.incremental.snapshot.chunk.sizeの値を小さくしてください。説明VVR 11.5 以降でのみサポートされます。
scan.binlog.tolerate.gtid-holes
このパラメーターを有効化すると、GTID シーケンスのギャップが無視され、不連続なイベントをスキップしてジョブを継続実行できます。
いいえ
BOOLEAN
false
このパラメーターを有効化する前に、ジョブの起動オフセットが期限切れになっていないことを確認してください。ジョブがクリアされたか期限切れになった GTID オフセットから起動した場合、エンジンは欠落したログを静かにスキップし、データ損失を引き起こします。
説明VVR 11.6 以降でのみサポートされます。
ディメンションテーブル専用パラメーター
パラメーター
説明
必須
データ型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
該当なし
URL 形式は:
jdbc:mysql://<endpoint>:<port>/<database-name>です。lookup.max-retries
読み取り操作失敗後の最大再試行回数。
いいえ
INTEGER
3
VVR 6.0.7 以降でのみサポートされます。
lookup.cache.strategy
キャッシュポリシー。
いいえ
STRING
該当なし
サポートされるポリシー:None、LRU、ALL。説明については、「背景情報」をご参照ください。
説明LRU キャッシュポリシーを使用する場合、lookup.cache.max-rows パラメーターも設定する必要があります。
lookup.cache.max-rows
キャッシュされる最大行数。
いいえ
INTEGER
100000
最近使用されていないキャッシュポリシーを選択した場合、キャッシュサイズを設定する必要があります。
ALL キャッシュポリシーを使用する場合、任意です。
lookup.cache.ttl
キャッシュの生存時間(TTL)。
いいえ
DURATION
10 秒
lookup.cache.ttl の設定は lookup.cache.strategy に依存します:
lookup.cache.strategy が None の場合、lookup.cache.ttl は任意であり、TTL がないことを示します。
lookup.cache.strategy が LRU の場合、lookup.cache.ttl はキャッシュ TTL です。デフォルトでは期限切れになりません。
lookup.cache.strategy が ALL の場合、lookup.cache.ttl はキャッシュ再読み込み間隔です。デフォルトでは再読み込みされません。
1 分や 10 秒などの形式で時間を指定します。
lookup.max-join-rows
メインテーブルの各行とディメンションテーブルを結合する際に返される結果の最大数。
いいえ
INTEGER
1024
該当なし。
lookup.filter-push-down.enabled
ディメンションテーブルのフィルターのプッシュダウンを有効化するかどうか。
いいえ
BOOLEAN
false
有効な値:
true:フィルターのプッシュダウンを有効化します。ディメンションテーブルは、SQL ジョブで定義された条件に基づいて早期にデータをフィルターします。
false(デフォルト):フィルターのプッシュダウンを無効化します。ディメンションテーブルはすべてのデータを読み込みます。
説明VVR 8.0.7 以降でのみサポートされます。
重要フィルターのプッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効化してください。MySQL ソーステーブルはフィルターのプッシュダウンをサポートしていません。Flink テーブルがソーステーブルおよびディメンションテーブルの両方として使用される場合、ディメンションテーブルでフィルターのプッシュダウンを有効化する場合は、SQL ヒント を使用して、ソーステーブルとして使用する際にこの構成を明示的に false に設定してください。そうしないと、ジョブの実行が失敗する可能性があります。
結果テーブル専用
パラメーター
説明
必須
データ型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
該当なし
URL 形式は:
jdbc:mysql://<endpoint>:<port>/<database-name>です。sink.max-retries
書き込み操作失敗後の最大再試行回数。
いいえ
INTEGER
3
該当なし。
sink.buffer-flush.batch-size
1 回のバッチで書き込まれるレコード数。
いいえ
INTEGER
4096
該当なし。
sink.buffer-flush.max-rows
メモリにキャッシュされる行数。
いいえ
INTEGER
10000
このパラメーターは、主キーが指定されている場合にのみ有効です。
sink.buffer-flush.interval
バッファーをフラッシュする間隔。バッファリングされたデータがこの間隔より長く待機し、フラッシュ条件を満たさない場合、システムはすべてのバッファリングされたデータを自動的にフラッシュします。
いいえ
DURATION
1 秒
該当なし。
sink.ignore-delete
DELETE 操作を無視するかどうか。
いいえ
BOOLEAN
false
Flink SQL ストリームに DELETE または UPDATE_BEFORE レコードが含まれている場合、複数のタスクによる同一テーブルの異なるフィールドへの同時更新により、データの不整合が発生する可能性があります。
たとえば、レコードが削除され、別のタスクが一部のフィールドのみを更新した場合、更新されなかったフィールドは null またはデフォルト値になります。これにより、データエラーが発生します。
sink.ignore-delete を true に設定すると、上流の DELETE および UPDATE_BEFORE 操作が無視され、この問題が回避されます。
説明UPDATE_BEFORE は Flink のリトラクト機構の一部であり、更新時に「古い値」をリトラクトするために使用されます。
ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE レコードがスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。
sink.ignore-null-when-update
データを更新する際に、入力値が null の場合にフィールドを null に更新するか、更新をスキップするか。
いいえ
BOOLEAN
false
有効な値:
true:フィールドの更新をスキップします。Flink テーブルに主キーがある場合にのみサポートされます。true に設定した場合:
VVR 8.0.6 以前では、結果テーブルのバッチ書き込みはサポートされません。
VVR 8.0.7 以降では、結果テーブルのバッチ書き込みがサポートされます。
バッチ書き込みは書き込み効率およびスループットを向上させますが、レイテンシーおよびメモリ不足(OOM)リスクを引き起こす可能性があります。ビジネス要件に基づいて、これらのトレードオフを考慮する必要があります。
false:フィールドを null に更新します。
説明VVR 8.0.5 以降でのみサポートされます。
データ型マッピング
CDC ソーステーブル
MySQL CDC フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
重要MySQL で TINYINT(1) 型を使用して 0 および 1 以外の値を格納しないでください。property-version が 0 に設定されている場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink BOOLEAN 型にマッピングします。これにより、データが不正確になる可能性があります。TINYINT(1) 型を使用して 0 および 1 以外の値を格納する場合は、「catalog.table.treat-tinyint1-as-boolean」構成パラメーターを参照してください。
ディメンションテーブルおよび結果テーブル
MySQL フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
説明p <= 38 の場合。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink は MySQL BLOB レコードを最大 2,147,483,647 バイト(2^31 − 1)までサポートしています。
BLOB
MEDIUMBLOB
LONGBLOB
データインジェスト
MySQL コネクタは、YAML データインジェストジョブのデータソースとして使用できます。
構文
source:
type: mysql
name: MySQL ソース
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxx構成
パラメーター | 説明 | 必須 | データ型 | デフォルト値 | 備考 |
type | データソースの種別。 | はい | STRING | 該当なし | 値は固定で mysql です。 |
name | データソースの名前。 | いいえ | STRING | 該当なし | 該当なし。 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 | はい | STRING | 該当なし | VPC アドレスを指定してください。 説明 MySQL データベースとリアルタイム Flink サービスが同じ VPC 内にない場合、まずクロス VPC ネットワーク接続を確立するか、パブリックネットワーク経由でデータベースにアクセスする必要があります。詳細については、「ストレージ管理および運用」および「完全管理 Flink クラスターがパブリックネットワークにアクセスする方法」をご参照ください。 |
username | MySQL データベースサービスのユーザー名。 | はい | STRING | 該当なし | 該当なし。 |
password | MySQL データベースサービスのパスワード。 | はい | STRING | 該当なし | 該当なし。 |
tables | 同期する MySQL テーブル。 | はい | STRING | 該当なし |
説明
|
tables.exclude | 同期対象から除外するテーブル。 | いいえ | STRING | 該当なし |
説明 ドット(.)はデータベース名とテーブル名を分離します。ドットを任意の文字にマッチさせるには、バックスラッシュ(\)でエスケープします。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | MySQL データベースサービスのポート番号。 | いいえ | INTEGER | 3306 | 該当なし。 |
schema-change.enabled | スキーマ進化イベントを送信するかどうかを指定します。 | いいえ | BOOLEAN | true | 該当なし。 |
server-id | 同期に使用されるデータベースクライアントの数値 ID または ID の範囲。 | いいえ | STRING | 5400 ~ 6400 の間のランダムな値が生成されます。 | この ID は MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブには、異なる ID を設定してください。 このパラメーターは ID の範囲(例:5400-5408)もサポートします。増分読み取りが有効化されている場合、複数の同時読み取りがサポートされます。この場合、各同時読み取りが異なる ID を使用できるように、ID の範囲を設定してください。 |
jdbc.properties.* | Java Database Connectivity(JDBC)URL のカスタム接続パラメーター。 | いいえ | STRING | 該当なし | カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。 サポートされる接続パラメーターの詳細については、「MySQL Configuration Properties」をご参照ください。 |
debezium.* | バイナリログ(binlog)データを読み取るための Debezium のカスタムパラメーター。 | いいえ | STRING | 該当なし | カスタム Debezium パラメーターを渡すことができます。たとえば、逆シリアル化エラーの処理方法を指定するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。 |
scan.incremental.snapshot.chunk.size | チャンクあたりの行サイズ。 | いいえ | INTEGER | 8096 | MySQL テーブルは読み取り用に複数のチャンクに分割されます。チャンクのデータは、完全に読み取られる前にメモリにキャッシュされます。 チャンクあたりの行数を減らすと、テーブル内のチャンク総数が増えます。これは障害復旧の粒度を低下させますが、メモリ不足(OOM)エラーを引き起こしたり、全体的なスループットを低下させたりする可能性があります。そのため、これらの要素を考慮して適切なチャンクサイズを設定する必要があります。 |
scan.snapshot.fetch.size | テーブルの完全データを読み取る際に、1 回に取得する最大レコード数。 | いいえ | INTEGER | 1024 | 該当なし。 |
scan.startup.mode | データ消費の起動モード。 | いいえ | STRING | initial | 有効な値:
重要 earliest-offset、specific-offset、および timestamp の起動モードでは、ジョブ起動時のテーブルスキーマが指定された開始オフセット時のスキーマと異なる場合、ジョブが失敗します。つまり、これらの 3 つの起動モードを使用する場合、指定された binlog 消費位置とジョブ起動時との間に、対応するテーブルのスキーマが変更されていないことを確認してください。 |
scan.startup.specific-offset.file | 起動モードが specific-offset に設定されている場合の、開始オフセットの binlog ファイル名。 | いいえ | STRING | 該当なし | このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。形式の例: |
scan.startup.specific-offset.pos | 起動モードが specific-offset に設定されている場合の、指定された binlog ファイル内の開始オフセット。 | いいえ | INTEGER | 該当なし | このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。 |
scan.startup.specific-offset.gtid-set | 起動モードが specific-offset に設定されている場合の、開始する GTID セット。 | いいえ | STRING | 該当なし | このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。GTID セットの形式の例: |
scan.startup.timestamp-millis | 起動モードが timestamp に設定されている場合の、開始するタイムスタンプ(ミリ秒単位)。 | いいえ | LONG | 該当なし | このパラメーターは、scan.startup.mode が timestamp に設定されている場合にのみ使用されます。タイムスタンプはミリ秒単位です。 重要 タイムスタンプを指定すると、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを判定し、指定されたタイムスタンプに対応する binlog ファイルを特定しようと試みます。指定されたタイムスタンプに対応する binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。 |
server-time-zone | データベースで使用されるセッションタイムゾーン。 | いいえ | STRING | このパラメーターを指定しない場合、システムは Flink ジョブの実行環境のタイムゾーン(選択したゾーンのタイムゾーン)をデータベースサーバータイムゾーンとして使用します。 | 例:Asia/Shanghai。このパラメーターは、MySQL TIMESTAMP 型を STRING 型に変換する方法を制御します。詳細については、「Debezium temporal types」をご参照ください。 |
scan.startup.specific-offset.skip-events | 特定のオフセットから読み取る際にスキップする binlog イベントの数。 | いいえ | INTEGER | 該当なし | この構成を使用する場合、scan.startup.mode を specific-offset に設定する必要があります。 |
scan.startup.specific-offset.skip-rows | 特定のオフセットから読み取る際にスキップする行変更の数。1 つの binlog イベントは複数の行変更に対応する場合があります。 | いいえ | INTEGER | 該当なし | この構成を使用する場合、scan.startup.mode を specific-offset に設定する必要があります。 |
connect.timeout | MySQL データベースサーバーへの接続タイムアウト後に再試行するまでの最大待機時間。 | いいえ | DURATION | 30 秒 | 該当なし。 |
connect.max-retries | MySQL データベースサービスへの接続失敗後の最大再試行回数。 | いいえ | INTEGER | 3 | 該当なし。 |
connection.pool.size | データベース接続プールのサイズ。 | いいえ | INTEGER | 20 | データベース接続プールは、データベース接続数を削減するために接続を再利用するために使用されます。 |
heartbeat.interval | ソースがハートビートイベントを使用して binlog オフセットを進める間隔。 | いいえ | DURATION | 30 秒 | ハートビートイベントは、ソースで binlog オフセットを進めるために使用されます。これは、更新頻度が低い MySQL のテーブルに役立ちます。このようなテーブルでは、binlog オフセットは自動的に進みません。ハートビートイベントにより、binlog オフセットを前方に進めることができ、オフセットの期限切れを防止できます。期限切れになった binlog オフセットは、ジョブの失敗および回復不能を引き起こします。ジョブは状態なしで再起動する必要があります。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズ中にシャーディングに使用するカラムを指定します。 | いいえ。 | STRING | 該当なし | 主キーからのみカラムを選択できます。 |
rds.region-id | Alibaba Cloud RDS for MySQL インスタンスのリージョン ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | 該当なし | リージョン ID の詳細については、「リージョンおよびゾーン」をご参照ください。 |
rds.access-key-id | Alibaba Cloud アカウントの RDS for MySQL インスタンスの AccessKey ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | 該当なし | 詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、AccessKey ID の指定にはシークレット管理を使用することを推奨します。詳細については、「変数管理」をご参照ください。 |
rds.access-key-secret | Alibaba Cloud アカウントの RDS for MySQL インスタンスの AccessKey Secret。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | 該当なし | 詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、AccessKey Secret の指定にはシークレット管理を使用することを推奨します。詳細については、「変数管理」をご参照ください。 |
rds.db-instance-id | Alibaba Cloud RDS for MySQL インスタンスのインスタンス ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | 該当なし | 該当なし。 |
rds.main-db-id | Alibaba Cloud RDS for MySQL インスタンスのプライマリデータベース ID。 | いいえ | STRING | 該当なし | プライマリデータベース ID の取得方法については、「RDS for MySQL ログバックアップ」をご参照ください。 |
rds.download.timeout | OSS から単一のアーカイブログをダウンロードする際のタイムアウト期間。 | いいえ | DURATION | 60s | 該当なし。 |
rds.endpoint | OSS binlog 情報を取得するためのサービスエンドポイント。 | いいえ | STRING | 該当なし | 有効な値については、「エンドポイント」をご参照ください。 |
rds.binlog-directory-prefix | binlog ファイルの保存先ディレクトリのプレフィックス。 | いいえ | STRING | rds-binlog- | 該当なし。 |
rds.use-intranet-link | 内部ネットワークリンクを使用して binlog ファイルをダウンロードするかどうかを指定します。 | いいえ | BOOLEAN | true | 該当なし。 |
rds.binlog-directories-parent-path | binlog ファイルが保存されている親ディレクトリの絶対パス。 | いいえ | STRING | 該当なし | 該当なし。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | いいえ | INTEGER | 1000 | メタデータがこの値を超える場合、複数の部分に分けて渡されます。 |
chunk-key.even-distribution.factor.lower-bound | 均等シャーディングのためのチャンク分布係数の下限。 | いいえ | DOUBLE | 0.05 | 分布係数がこの値より小さい場合、非均等シャーディングが使用されます。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の合計数。 |
chunk-key.even-distribution.factor.upper-bound | 均等シャーディングのためのチャンク分布係数の上限。 | いいえ | DOUBLE | 1000.0 | 分布係数がこの値より大きい場合、非均等シャーディングが使用されます。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の合計数。 |
scan.incremental.close-idle-reader.enabled | スナップショット完了後にアイドルリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | この構成を有効にするには、 |
scan.only.deserialize.captured.tables.changelog.enabled | 増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 | いいえ | BOOLEAN |
| 有効な値:
|
scan.parallel-deserialize-changelog.enabled | 増分フェーズ中に、変更イベントの解析にマルチスレッドを使用するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
説明 この機能は Flink コンピュートエンジン VVR 8.0.11 以降でのみサポートされます。 |
scan.parallel-deserialize-changelog.handler.size | 変更イベントの解析にマルチスレッドを使用する場合のイベントハンドラーの数。 | いいえ | INTEGER | 2 | 説明 この機能は Flink コンピュートエンジン VVR 8.0.11 以降でのみサポートされます。 |
metadata-column.include-list | 下流の結果テーブルに渡すメタデータカラム。 | いいえ | STRING | 該当なし | 利用可能なメタデータ列には、 説明 MySQL CDC YAML コネクタでは、データベース名、テーブル名、または 重要
|
scan.newly-added-table.enabled | 前回の起動時に一致しなかった新しく追加されたテーブルを同期するか、チェックポイントから再起動時に現在一致していないテーブルを状態から削除するかを指定します。 | いいえ | BOOLEAN | false | これは、チェックポイントまたはセーブポイントから再起動するときに有効になります。 |
scan.binlog.newly-added-table.enabled | 増分フェーズ中に、新たに一致したテーブルのデータを送信するかどうかを指定します。 | いいえ | BOOLEAN | false |
|
scan.incremental.snapshot.chunk.key-column | 特定のテーブルのスナップショットフェーズ中にシャーディングに使用するカラムを指定します。 | いいえ | STRING | 該当なし |
|
scan.parse.online.schema.changes.enabled | 増分フェーズ中に、RDS ロックレス変更 DDL イベントの解析を試行するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
これは実験的な機能です。オンラインロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを作成してください。 説明 この機能は Flink コンピュートエンジン VVR 11.0 以降でのみサポートされます。 |
scan.incremental.snapshot.backfill.skip | スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
バックフィルをスキップすると、スナップショットフェーズ中にテーブルに対して行われた変更が、後続の増分フェーズで読み取られ、スナップショットにマージされなくなります。 重要 バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再生されるため、データの不整合が発生する可能性があります。保証されるのは at-least-once セマンティクスのみです。 説明 この機能は Flink コンピュートエンジン VVR 11.1 以降でのみサポートされます。 |
treat-tinyint1-as-boolean.enabled | TINYINT(1) 型を BOOLEAN 型として扱うかどうかを指定します。 | いいえ | BOOLEAN | true | 有効な値:
|
treat-timestamp-as-datetime-enabled | TIMESTAMP 型を DATETIME 型として扱うかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
MySQL TIMESTAMP 型は UTC 時間を格納し、タイムゾーンの影響を受けます。MySQL DATETIME 型はリテラル時間を格納し、タイムゾーンの影響を受けません。 これを有効化すると、MySQL TIMESTAMP 型のデータは server-time-zone に基づいて DATETIME 型に変換されます。 |
include-comments.enabled | テーブルおよびカラムのコメントを同期するかどうかを指定します。 | いいえ | BOOELEAN | false | 有効な値:
これを有効化すると、ジョブのメモリ使用量が増加します。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布するかどうかを指定します。 | いいえ | BOOELEAN | false | 有効な値:
これは実験的な機能です。これを有効化すると、スナップショットフェーズ中に TaskManager で最後のチャンクを同期する際のメモリ不足(OOM)エラーのリスクが低減されます。ジョブの初回起動前にこのパラメーターを追加してください。 説明 この機能は Flink コンピュートエンジン VVR 11.1 以降でのみサポートされます。 |
binlog.session.network.timeout | binlog 接続のネットワークタイムアウト。 | いいえ | DURATION | 10 分 | これを 0 秒に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。 説明 この機能は Flink コンピュートエンジン VVR 11.5 以降でのみサポートされます。 |
scan.rate-limit.records-per-second | ソースが 1 秒間に送信できるレコードの最大数を制限します。 | いいえ | LONG | 該当なし | これは、データ読み取りを制限する必要があるシナリオに適用されます。この制限は、完全フェーズと増分フェーズの両方で有効です。 ソースの 完全読み取りフェーズでは、通常、各バッチで読み取るデータエントリの数を減らす必要があります。 説明 この機能は Flink コンピュートエンジン VVR 11.5 以降でのみサポートされます。 |
include-binlog-meta.enable | 元の MySQL binlog 情報(GTID や binlog オフセットなど)をメッセージに含めるかどうかを指定します。 | いいえ | Boolean | false | これは、既存の canal 同期リンクの置き換えなど、元の binlog 同期シナリオに適しています。 説明 この機能は Flink コンピュートエンジン VVR 11.6 以降でのみサポートされます。 |
scan.binlog.tolerate.gtid-holes | このパラメーターを有効化すると、GTID シーケンスのギャップが無視され、ジョブが不連続なイベントをバイパスして継続実行できるようになります。 | いいえ | Boolean | false | このパラメーターを有効にする前に、ジョブの開始オフセットが期限切れになっていないことを確認する必要があります。ジョブがクリアされた、または期限切れになった GTID オフセットから開始した場合、エンジンは欠落したログを静かにスキップし、データ損失を引き起こします。 説明 このパラメーターは Flink コンピュートエンジン VVR 11.6 以降でのみサポートされます。 |
型マッピング
以下の表は、データインジェストのデータ型マッピングを示しています。
MySQL CDC フィールド型 | CDC フィールド型 |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | マッピングは
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | STRING 説明 MySQL では、DECIMAL データ型の精度は最大 65 です。Flink では、精度は 38 に制限されています。したがって、38 を超える精度で DECIMAL カラムを定義する場合、精度の損失を避けるために STRING にマッピングする必要があります。 |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING 説明 JSON データ型は、Flink で JSON 形式の文字列に変換されます。 |
GEOMETRY | STRING 説明 MySQL の空間データ型は、固定 JSON 形式の文字列に変換されます。詳細については、「MySQL 空間データ型マッピング」をご参照ください。 |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES 説明 MySQL の BLOB データ型の場合、長さが 2,147,483,647(2^31 - 1)以下の BLOB のみがサポートされます。 |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
使用例
CDC ソーステーブル
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;結果テーブル
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;データインジェストソース
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
MySQL CDC ソーステーブルについて
実装原則
MySQL CDC ソーステーブルが起動すると、テーブル全体をスキャンし、主キーに基づいて複数のチャンクに分割し、バイナリログオフセットを記録します。その後、増分スナップショットアルゴリズムを使用して SELECT 文で各チャンクからデータを順次読み取ります。ジョブは定期的にチェックポイントを実行し、完了したチャンクを記録します。フェールオーバーが発生した場合、未完了のチャンクから読み取りを再開します。すべてのチャンクが読み取られた後、以前に記録されたバイナリログオフセットから増分変更レコードを読み取ります。Flink ジョブは定期的にチェックポイントを実行し続け、バイナリログオフセットを記録することで、1 回限りのセマンティクスを保証します。
増分スナップショットアルゴリズムの詳細については、「MySQL CDC コネクタ」をご参照ください。
メタデータ
メタデータは、複数のデータベースおよびテーブルからデータをマージおよび同期するシャーディングシナリオで役立ちます。このような場合、各レコードのソースデータベースおよびテーブルを区別する必要があることがよくあります。メタデータカラムは、ソーステーブルのデータベース名およびテーブル名情報へのアクセスを提供し、複数のシャーディングされたテーブルを単一の送信先テーブルに簡単にマージできるようにします。
MySQL CDC ソースはメタデータカラム構文をサポートしており、以下のメタデータにアクセスできます。
メタデータキー
メタデータ型
説明
database_name
STRING NOT NULL
レコードを含むデータベース名。
table_name
STRING NOT NULL
レコードを含むテーブル名。
op_ts
TIMESTAMP_LTZ(3) NOT NULL
データベース内のレコードの変更時間。レコードがバイナリログではなく履歴データからのものである場合、この値は常に 0 です。
op_type
STRING NOT NULL
レコードの変更タイプ。
+I: INSERT メッセージ
-D: DELETE メッセージ
-U: UPDATE_BEFORE メッセージ
+U: UPDATE_AFTER メッセージ
説明VVR 8.0.7 以降でのみサポートされます。
query_log
STRING NOT NULL
行に対応する MySQL クエリログレコード。
説明MySQL で binlog_rows_query_log_events パラメーターが有効化されている必要があります。
以下のコード例は、MySQL インスタンス内の異なるデータベースからの複数のシャーディングされた注文テーブルを、Hologres の単一の holo_orders テーブルにマージします。
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- データベース名を読み取ります。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- テーブル名を読み取ります。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更時間を読み取ります。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取ります。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 複数のシャーディングされたデータベースを正規表現でマッチさせます。 'table-name' = 'orders_.*' -- 複数のシャーディングされたテーブルを正規表現でマッチさせます。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;上記のコードに基づき、WITH 句で scan.read-changelog-as-append-only.enabled パラメーターを true に設定した場合、結果テーブルの主キー構成によって出力が異なります。
結果テーブルの主キーが order_id の場合、出力にはソーステーブルからの各主キーの最後の変更のみが含まれます。最後の変更が削除操作であった主キーの場合、結果テーブルには同じ主キーと op_type が -D のレコードが表示されます。
結果テーブルの主キーが order_id、operation_ts、および op_type の場合、出力にはソーステーブルからの各主キーの完全な変更履歴が含まれます。
正規表現のサポート
MySQL CDC ソーステーブルは、テーブル名またはデータベース名で正規表現を使用して、複数のテーブルまたはデータベースをマッチさせることをサポートしています。以下のコード例は、正規表現を使用して複数のテーブルを指定する方法を示しています。
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 複数のデータベースを正規表現でマッチさせます。 'table-name' = '(t[5-8]|tt)' -- 複数のテーブルを正規表現でマッチさせます。 );例の正規表現の説明:
^(test).* はプレフィックスマッチングの例です。この式は、test1 や test2 のように "test" で始まるデータベース名にマッチします。
.*[p$] はサフィックスマッチングの例です。この式は、cdcp や edcp のように "p" で終わるデータベース名にマッチします。
txc は完全一致です。これはデータベース名 "txc" にマッチします。
完全パスのテーブル名をマッチさせる場合、MySQL CDC はデータベース名とテーブル名、つまり database-name.table-name をマッチングパターンとして使用してテーブルを一意に識別します。たとえば、パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) は、データベース内のテーブル txc.tt および test2.test5 にマッチできます。
重要SQL ジョブ構成では、table-name および database-name は、複数のテーブルまたはデータベースを指定するためのカンマ区切りのリストをサポートしていません。
複数のテーブルにマッチさせる場合、または複数の正規表現を使用する場合は、縦棒(|)で連結し、括弧で囲んでください。たとえば、テーブル 'user' および 'product' を読み取るには、table-name を
(user|product)に設定します。正規表現にカンマが含まれる場合は、縦棒(|)演算子を使用して書き換えてください。たとえば、
mytable_\d{1, 2}を同等の(mytable_\d{1}|mytable_\d{2})に書き換えて、カンマの使用を避けてください。
同時実行制御
MySQL コネクタは、完全データの同時読み取りをサポートし、データ読み込み効率を向上させます。Flink Realtime Compute コンソールの Autopilot 自動チューニング機能と組み合わせることで、同時完全データ読み取り後に増分フェーズで自動的にスケールインし、計算リソースを節約します。
Realtime Compute 開発コンソールでは、基本モードまたはエキスパートモードでジョブの並列処理の次数を設定できます。同時実行数の設定の違いは以下のとおりです。
基本モードで設定された同時実行数は、ジョブ全体に適用されます。

エキスパートモードでは、特定の頂点の並列処理の次数を設定できます。

リソース構成の詳細については、「ジョブデプロイメント情報の設定」をご参照ください。
重要モードに関係なく、同時実行数を設定する場合、テーブルで宣言された server-id の範囲は、ジョブの並列処理の次数以上である必要があります。たとえば、server-id の範囲が 5404-5412 の場合、8 つの一意の server ID が存在するため、ジョブは最大 8 つの同時タスクを持つことができます。同じ MySQL インスタンスにアクセスする異なるジョブは、重複しない server-id の範囲を持つ必要があり、各ジョブは一意の server-id を明示的に構成する必要があります。
Autopilot 自動スケーリング
完全データフェーズでは、大量の履歴データが蓄積されます。読み取り効率を向上させるために、通常は同時読み取りが使用されます。バイナリログ増分フェーズでは、バイナリログデータ量が少なく、グローバルな順序を保証するため、通常はシングルスレッド読み取りで十分です。完全フェーズと増分フェーズで異なるリソース要件は、自動チューニング機能によって自動的にバランスされます。
自動チューニングは、MySQL CDC ソースの各タスクのトラフィックを監視します。ジョブがバイナリログフェーズに入ると、1 つのタスクのみがバイナリログ読み取りを担当し、他のタスクがアイドル状態の場合、自動チューニングはソースの CU 数と同時実行数を自動的に削減します。自動チューニングを有効にするには、ジョブ操作ページで自動チューニングモードを Active に設定します。
説明同時実行数を削減するための最小トリガー間隔は、デフォルトで 24 時間です。自動チューニングパラメーターおよび詳細については、「自動チューニングの設定」をご参照ください。
起動モード
scan.startup.mode 構成項目を使用して、MySQL CDC ソーステーブルの起動モードを指定します。有効な値:
initial(デフォルト):初回起動時に、データベーステーブルの完全読み取りを実行し、その後増分バイナリログ読み取りに切り替えます。
earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古いバイナリログオフセットから読み取りを開始します。
latest-offset:スナップショットフェーズをスキップし、バイナリログの末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブ開始後に発生したデータ変更のみを読み取ります。
specific-offset:スナップショットフェーズをスキップし、指定されたバイナリログオフセットから読み取りを開始します。オフセットは、バイナリログファイル名と位置、または GTID セットで指定できます。
timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプからバイナリログイベントの読み取りを開始します。
使用例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始します。 'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始します。 'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始します。 'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始します。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードのバイナリログファイル名を指定します。 'scan.startup.specific-offset.pos' = '4', -- specific-offset モードのバイナリログ位置を指定します。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードの GTID セットを指定します。 'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードの起動タイムスタンプを指定します。 ... )重要MySQL ソースは、チェックポイント時に INFO レベルのログで現在のオフセットを出力します。ログプレフィックスは
Binlog offset on checkpoint {checkpoint-id}です。このログは、特定のチェックポイントオフセットからジョブを開始するのに役立ちます。読み取られるテーブルがスキーマ変更を受けている場合、earliest-offset、specific-offset、または timestamp モードから開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のテーブルスキーマを維持しており、スキーマが一致しない以前のデータが正しく解析できないためです。
キーレス CDC ソーステーブルについて
キーレステーブルを使用するには、scan.incremental.snapshot.chunk.key-column を設定する必要があり、選択されたカラムは NULL 不可である必要があります。
キーレス CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定されたカラムの動作によって決定されます。
指定されたカラムが更新されない場合、1 回限りのセマンティクスが保証されます。
指定されたカラムが更新される場合、at-least-once セマンティクスのみが保証されます。ただし、下流システムで主キーを指定し、べき等操作を使用することでデータの正確性を保証できます。
Alibaba Cloud ApsaraDB RDS for MySQL バックアップログの読み取り
MySQL CDC ソーステーブルは、Alibaba Cloud ApsaraDB RDS for MySQL からバックアップログを読み取ることをサポートしています。これは、完全データフェーズに時間がかかり、ローカルのバイナリログファイルが自動的にクリーンアップされるが、自動または手動でアップロードされたバックアップファイルがまだ存在する場合に役立ちます。
使用例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )CDC ソースの再利用の有効化
単一のジョブで複数の MySQL CDC ソーステーブルが複数の Binlog クライアントを起動すると、すべてのソーステーブルが同じインスタンスにある場合、データベースへの負荷が増加します。詳細については、「MySQL CDC よくある質問」をご参照ください。
ソリューション
VVR 8.0.7 以降は MySQL CDC ソースの再利用をサポートしています。再利用により、結合可能な MySQL CDC ソーステーブルがマージされます。マージは、データベース名、テーブル名、および
server-idを除いて、ソーステーブルの構成が同一である場合に発生します。エンジンは、同じジョブ内の MySQL CDC ソースを自動的にマージします。手順
SQL ジョブで
SETコマンドを使用します。SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 および 8.0.9) さらに以下を設定します。 SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 以降では、再利用がデフォルトで有効化されています。
ジョブをステートレスで起動します。ソースの再利用構成を変更すると、ジョブのトポロジーが変更されます。ジョブはステートレスで起動する必要があります。そうしないと、ジョブの起動に失敗したり、データが失われたりする可能性があります。ソースがマージされると、
MergetableSourceScanノードが表示されます。
重要再利用を有効化した後、演算子チェーンを無効にしないことを推奨します。
pipeline.operator-chainingをfalseに設定すると、データシリアル化および逆シリアル化のオーバーヘッドが増加します。マージされるソースが多いほど、オーバーヘッドは大きくなります。VVR 8.0.7 では、演算子チェーンを無効にするとシリアル化の問題が発生します。
バイナリログ読み取りの高速化
MySQL コネクタがソーステーブルまたはデータインジェストソースとして使用される場合、増分フェーズ中にバイナリログファイルを解析してさまざまな変更メッセージを生成します。バイナリログファイルは、すべてのテーブル変更をバイナリ形式で記録します。以下の方法でバイナリログファイルの解析を高速化します。
解析フィルター構成の有効化
構成項目
scan.only.deserialize.captured.tables.changelog.enabledを使用します。指定されたテーブルの変更イベントのみを解析します。
Debezium パラメーターの最適化
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:ブロッキングキューが保持できるレコードの最大数。Debezium がデータベースからイベントストリームを読み取る際、イベントを下流に書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。debezium.max.batch.size:コネクタが各反復で処理するイベントの最大数。デフォルト値は 2048 です。debezium.poll.interval.ms:コネクタが新しい変更イベントを要求する前に待機するミリ秒数。デフォルト値は 1000 ミリ秒(1 秒)です。
使用例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium 構成
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 解析フィルターを有効化
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 指定されたテーブルの変更イベントのみを解析します。
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium 構成
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# 解析フィルターを有効化
scan.only.deserialize.captured.tables.changelog.enabled: trueMySQL CDC のエンタープライズ版は、バイナリログ消費能力が 85 MB/秒であり、これはオープンソースコミュニティ版の約 2 倍です。バイナリログ生成レートが 85 MB/秒(つまり、6 秒ごとに 512 MB のファイル)を超えると、Flink ジョブのレイテンシーは継続的に増加します。バイナリログ生成レートが低下すると、レイテンシーは徐々に減少します。バイナリログファイルに大規模なトランザクションが含まれている場合、処理レイテンシーは一時的に増加し、トランザクションのログが処理された後に減少する可能性があります。
MySQL CDC DataStream API
DataStream を使用してデータを読み書きするには、対応する DataStream コネクタを Flink に接続してください。DataStream コネクタの設定手順については、「DataStream コネクタの使用方法」をご参照ください。
DataStream API プログラムを作成し、MySqlSource を使用します。以下にコードと pom 依存関係の例を示します。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>MySqlSource を構築する際、以下のパラメーターを指定する必要があります。
パラメーター | 説明 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 |
port | MySQL データベースサービスのポート番号。 |
databaseList | MySQL データベースの名前。 説明 データベース名には正規表現を使用して複数のデータベースからデータを読み取ることができます。 |
username | MySQL データベースサービスのユーザー名。 |
password | MySQL データベースサービスのパスワード。 |
deserializer | 逆シリアル化器は、SourceRecord 型のレコードを指定された型に変換します。有効な値:
|
pom 依存関係では、以下のパラメーターを指定する必要があります。
${vvr.version} | Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョン。例: 説明 Maven に表示されるバージョンを参照してください。ホットフィックスバージョンは、他の通知なしに定期的にリリースされる場合があります。 |
${flink.version} | Apache Flink のバージョン。例: 重要 ジョブ実行中の非互換性の問題を避けるため、Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョンに対応する Apache Flink バージョンを使用してください。バージョンマッピングについては、「エンジン」をご参照ください。 |
よくある質問
CDC ソーステーブルの使用時に発生する一般的な問題については、「CDC の問題」をご参照ください。