このトピックでは、MySQL コネクタの使用方法について説明します。
背景情報
MySQL コネクタは、RDS MySQL、PolarDB for MySQL、OceanBase(MySQL モード)、および自己管理型 MySQL など、MySQL プロトコルと互換性のあるすべてのデータベースをサポートしています。
MySQL コネクタを使用して OceanBase から読み取る場合、OceanBase のバイナリログ(binlog)が有効化され、正しく構成されていることを確認してください。詳細については、「バイナリログ操作」をご参照ください。本機能はパブリックプレビュー版です。十分な評価を行い、慎重にご利用ください。
以下の表に、MySQL コネクタの対応状況を示します。
カテゴリ | 詳細 |
対応タイプ | ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストソース |
実行モード | ストリーミングモードのみ |
データ形式 | 該当なし |
特定の監視メトリック | |
API タイプ | DataStream、SQL、およびデータインジェスト YAML |
結果テーブルのデータ更新または削除の対応 | はい |
特徴
MySQL Change Data Capture(CDC)ソーステーブルは、まずデータベースの完全な履歴データを読み取り、その後シームレスにバイナリログ(binlog)イベントの読み取りに切り替えます。このプロセスにより、障害発生時でもデータの欠落や重複が発生せず、1 回限りのセマンティクスが保証されます。MySQL CDC ソーステーブルは、完全データの同時読み取りをサポートし、増分スナップショットアルゴリズムを用いてロックフリーの読み取りと再開可能なデータ転送を実現します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。
バッチ処理とストリーミング処理の統合:別々のパイプラインを維持する必要なく、完全データおよび増分データの両方を読み取ることができます。
完全データの同時読み取り:水平方向にスケールアウトしてパフォーマンスを向上させます。
完全読み取りから増分読み取りへのシームレスな切り替え:自動的にスケールインしてコンピューティングリソースを節約します。
再開可能なデータ転送:完全データ読み取り中の再開可能データ転送をサポートし、安定性を向上させます。
ロックフリー読み取り:オンラインビジネス運用に影響を与えることなく完全データを読み取れます。
バックアップログの読み取り:RDS MySQL のバックアップログの読み取りをサポートします。
並列 binlog 解析:binlog ファイルを並列で解析することで、読み取り遅延を低減します。
前提条件
MySQL CDC ソーステーブルを使用する前に、MySQL の設定に記載されている通り、MySQL データベースを設定する必要があります。以下の設定が必要です。
RDS MySQL
接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。
対応している MySQL バージョン:5.6、5.7、および 8.0.x
バイナリログ(binlog)を有効化します。デフォルトで有効化されています。
binlog 形式を ROW に設定します。デフォルトの形式です。
binlog_row_image を FULL に設定します。デフォルト設定です。
バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。
SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を持つ MySQL ユーザーが作成されました。
MySQL データベースおよびテーブルを作成します。詳細については、「RDS MySQL のデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。
IP ホワイトリストを設定します。詳細については、「RDS MySQL のホワイトリストの設定」をご参照ください。
PolarDB for MySQL
接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。
対応している MySQL バージョン:5.6、5.7、および 8.0.x
バイナリログ(binlog)を有効化します。デフォルトでは無効化されています。
binlog 形式を ROW に設定します。デフォルトの形式です。
binlog_row_image を FULL に設定します。デフォルト設定です。
バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与しました。
MySQL データベースおよびテーブルを作成します。詳細については、「PolarDB for MySQL のデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。
IP ホワイトリストを設定します。詳細については、「PolarDB for MySQL のホワイトリストの設定」をご参照ください。
自己管理型 MySQL
接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。
対応している MySQL バージョン:5.6、5.7、および 8.0.x
バイナリログ(binlog)を有効化します。デフォルトでは無効化されています。
binlog 形式を ROW に設定します。デフォルトの形式は STATEMENT です。
binlog_row_image を FULL に設定します。デフォルト設定です。
バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与済みである必要があります。
MySQL データベースおよびテーブルを作成します。詳細については、「自己管理型 MySQL インスタンスのデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。
IP ホワイトリストを設定します。詳細については、「自己管理型 MySQL インスタンスのホワイトリストの設定」をご参照ください。
制限事項
一般的な制限事項
MySQL CDC ソーステーブルは、ウォーターマークの定義をサポートしていません。
CREATE TABLE AS SELECT(CTAS)および CREATE DATABASE AS SELECT(CDAS)ジョブにおいて、MySQL CDC ソーステーブルは一部のスキーマ変更を同期できます。対応する変更タイプの詳細については、「スキーマ進化同期ポリシー」をご参照ください。
MySQL CDC コネクタは、バイナリログトランザクション圧縮をサポートしていません。したがって、MySQL CDC コネクタを使用して増分データを消費する場合、この機能が無効化されていることを確認する必要があります。そうでない場合、増分データの取得に失敗する可能性があります。
RDS MySQL の制限事項
RDS MySQL のセカンダリデータベースまたは読み取り専用レプリカからのデータ読み取りは推奨しません。これらのインスタンスの binlog 保持期間は通常短く、binlog が期限切れになってクリアされると、ジョブが binlog データを消費できずエラーが発生する可能性があります。
RDS MySQL では、プライマリデータベースとセカンダリデータベース間の並列同期がデフォルトで有効化されていますが、トランザクション順序の一貫性は保証されません。これにより、プライマリ/セカンダリのスイッチオーバーおよびチェックポイント回復時に一部のデータが欠落する可能性があります。この問題を解決するには、RDS MySQL で手動で slave_preserve_commit_order オプションを有効化できます。
PolarDB for MySQL の制限事項
MySQL CDC ソーステーブルは、PolarDB for MySQL バージョン 1.0.19 以前のマルチマスタクラスターからの読み取りをサポートしていません。詳細については、「マルチマスタクラスターとは?」をご参照ください。これらのクラスターによって生成された binlog には、重複するテーブル ID が含まれている可能性があり、CDC ソーステーブルでスキーママッピングエラーが発生し、binlog 解析エラーにつながる可能性があります。
オープンソース MySQL の制限事項
MySQL では、プライマリ/レプリカのバイナリログレプリケーション中にデフォルトでトランザクション順序が維持されます。ただし、MySQL レプリカで並列レプリケーション(slave_parallel_workers > 1)が有効化され、かつ slave_preserve_commit_order=ON が設定されていない場合、トランザクションのコミット順序がプライマリデータベースと異なる可能性があります。Flink CDC がチェックポイントから回復する際に、この順序の不整合によりデータが欠落する可能性があります。そのため、MySQL レプリカで slave_preserve_commit_order = ON を設定するか、slave_parallel_workers = 1 に設定することを推奨します。なお、slave_parallel_workers を 1 に設定すると、レプリケーションパフォーマンスが低下する可能性があります。
注意事項
シンクテーブル
DDL ステートメントで自動インクリメントのプライマリキーを宣言しないでください。MySQL は、データ書き込み時にこのフィールドを自動的に埋めます。
少なくとも 1 つの非プライマリキーのフィールドを宣言する必要があります。そうでない場合、エラーが発生します。
DDL ステートメント内の NOT ENFORCED は、Flink がプライマリキーの妥当性チェックを実施しないことを意味します。プライマリキーの正確性および整合性は、ユーザーが保証する必要があります。詳細については、「妥当性チェック」をご参照ください。
ディメンションテーブル
クエリの高速化のためにインデックスを使用する場合、JOIN 句のフィールド順序は、インデックス定義の順序と一致させる必要があります。これは「左端プレフィックスルール」と呼ばれます。たとえば、インデックスが
(a, b, c)上にある場合、JOIN 条件はON t.a = x AND t.b = yである必要があります。Flink によって生成された SQL は、オプティマイザーによって再書き換えられる可能性があり、実際のデータベースクエリでインデックスが使用されないことがあります。インデックスが実際に使用されているかどうかを確認するには、MySQL の実行計画(EXPLAIN)またはスロークエリログを確認し、実行された実際の SELECT ステートメントを確認してください。
SQL
MySQL コネクタは、SQL ジョブでソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。
構文
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);コネクタが結果テーブルに書き込む方法:受信した各レコードに対して、コネクタは単一の SQL ステートメントを構築して実行します。正確なステートメントは、テーブル構造に応じて異なります:
プライマリキーのない結果テーブルの場合、システムは次の SQL ステートメントを構築して実行します:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);プライマリキーを持つ結果テーブルの場合、システムは次の SQL ステートメントを実行します:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;注:物理テーブルにプライマリキーに加えて一意なインデックス制約がある場合、異なるプライマリキーだが一意なインデックスでカバーされるカラムの値が同一である 2 つのレコードを挿入すると、一意なインデックスの競合が発生します。この競合によりデータが上書きされ、出力データでデータ損失が発生します。
MySQL データベースで自動インクリメントのプライマリキーを定義している場合、Flink DDL ステートメントで自動インクリメントフィールドを宣言しないでください。データ挿入時にデータベースがこのフィールドを自動的に埋めます。コネクタは、自動インクリメントフィールドを含むデータの書き込みおよび削除をサポートしますが、更新はサポートしません。
WITH パラメーター
一般
パラメーター
説明
必須
データ型
デフォルト値
備考
connector
テーブルのタイプ。
はい
STRING
なし
ソーステーブルとして使用する場合、このオプションを
mysql-cdcまたはmysqlに設定します。これらは同等です。ディメンションテーブルまたは結果テーブルとして使用する場合、このオプションをmysqlに設定します。hostname
MySQL データベースの IP アドレスまたはホスト名。
はい
STRING
なし
VPC アドレスの入力を推奨します。
説明MySQL データベースと Realtime Compute for Apache Flink が同じ VPC 内にない場合、クロス VPC ネットワーク接続を確立するか、インターネット経由でアクセスしてください。詳細については、「ワークスペースの管理および運用」および「フルマネージド Flink クラスターがインターネットにアクセスする方法」をご参照ください。
username
MySQL データベースサービスのユーザー名。
はい
STRING
なし
なし。
password
MySQL データベースサービスのパスワード。
はい
STRING
なし
なし。
database-name
MySQL データベースの名前。
はい
STRING
なし
ソーステーブルとして使用する場合、このオプションは複数のデータベースからデータを読み取るために正規表現をサポートします。
正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ を使用しないでください。「table-name」の備考欄をご覧ください。
table-name
MySQL テーブルの名前。
はい
STRING
なし
ソーステーブルとして使用する場合、このオプションは複数のテーブルからデータを読み取るために正規表現をサポートします。
複数の MySQL テーブルからデータを読み取る場合、複数の CTAS ステートメントを 1 つのジョブとして送信します。これにより、複数の binlog リスナーを有効化することを避け、パフォーマンスおよび効率を向上させます。詳細については、「複数の CTAS ステートメント:1 つのジョブとして送信」をご参照ください。
正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ を使用しないでください。詳細については、以下の備考をご覧ください。
説明MySQL CDC ソーステーブルがテーブル名をマッチさせる場合、指定した database-name および table-name を文字列 \\.(VVR バージョン 8.0.1 より前のバージョンでは文字 .)を使用して完全パスの正規表現に結合します。その後、この正規表現を使用して、MySQL データベース内のテーブルの完全修飾名をマッチさせ、読み取るテーブルを決定します。
たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' を設定した場合、コネクタは正規表現 db_.*\\.tb_.+(または 8.0.1 より前のバージョンでは db_.*.tb_.+)を使用して完全修飾テーブル名をマッチさせ、読み取るテーブルを決定します。
port
MySQL データベースサービスのポート番号。
いいえ
INTEGER
3306
なし。
ソース固有
パラメーター
説明
必須
データ型
デフォルト値
備考
server-id
データベースクライアントの数値 ID。
いいえ
STRING
5400 ~ 6400 の間のランダム値が生成されます。
この ID は、MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブに対して、異なる ID を割り当てることを推奨します。
このオプションは、ID の範囲(例:5400-5408)もサポートします。増分読み取りが有効化されている場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用できるよう、ID の範囲を指定することを推奨します。詳細については、「server ID の使用」をご参照ください。
scan.incremental.snapshot.enabled
増分スナップショットを有効化するかどうかを指定します。
いいえ
BOOLEAN
true
増分スナップショットはデフォルトで有効化されています。増分スナップショットは、完全データスナップショットを読み取るための新しいメカニズムです。従来のスナップショット方式と比較して、以下のような利点があります:
完全データの読み取りを並列で実行できます。
完全データの読み取りをチャンク単位でチェックポイントできます。
完全データの読み取りにグローバル読み取りロック(FLUSH TABLES WITH READ LOCK)を取得する必要はありません。
ソースが同時読み取りをサポートするようにする場合、各同時リーダーには一意の server ID が必要です。したがって、server-id は 5400-6400 のような範囲である必要があり、その範囲は並列処理の次数以上である必要があります。
説明この設定項目は、Ververica Runtime(VVR)11.1 以降で削除されました。
scan.incremental.snapshot.chunk.size
各チャンクのサイズ(行数)。
いいえ
INTEGER
8096
増分スナップショット読み取りが有効化されている場合、テーブルは読み取り用に複数のチャンクに分割されます。チャンクデータは、完全に読み取られるまでメモリ内にバッファーされます。
各チャンクに含まれる行数が少ないほど、テーブル内のチャンクの総数が多くなります。これは、障害回復の粒度を小さくしますが、Out Of Memory(OOM)エラーが発生したり、全体的なスループットが低下したりする可能性があります。したがって、適切なチャンクサイズを設定する必要があります。
scan.snapshot.fetch.size
完全テーブルデータを読み取る際に、1 回に取得する最大レコード数。
いいえ
INTEGER
1024
なし。
scan.startup.mode
データ消費の起動モード。
いいえ
STRING
initial
有効な値:
initial(デフォルト):最初の起動時に、まず完全な履歴データをスキャンし、その後最新の binlog データを読み取ります。
latest-offset:最初の起動時に履歴データをスキャンしません。binlog の末尾から読み取りを開始し、コネクタの起動後に発生した最新の変更のみを読み取ります。
earliest-offset:履歴データをスキャンしません。利用可能な最も古い binlog から読み取りを開始します。
specific-offset:履歴データをスキャンしません。指定した特定の binlog オフセットから読み取りを開始します。オフセットは、scan.startup.specific-offset.file および scan.startup.specific-offset.pos の両方を設定するか、scan.startup.specific-offset.gtid-set のみを設定して、特定の GTID セットから開始できます。
timestamp:履歴データをスキャンしません。指定したタイムスタンプから binlog イベントの読み取りを開始します。scan.startup.timestamp-millis を使用して、ミリ秒単位でタイムスタンプを指定します。
重要earliest-offset、specific-offset、または timestamp を使用する場合、指定した binlog 消費位置とジョブ起動時刻の間にテーブルスキーマが変更されていないことを確認してください。そうしないと、スキーマの不一致によるエラーが発生する可能性があります。
scan.startup.specific-offset.file
特定のオフセット起動モードで使用する開始オフセットの binlog ファイル名。
いいえ
STRING
なし
この設定を使用する場合、scan.startup.mode を specific-offset に設定します。ファイル名の例:
mysql-bin.000003。scan.startup.specific-offset.pos
特定のオフセット起動モードで使用する指定された binlog ファイル内の開始オフセット。
いいえ
INTEGER
なし
この設定を使用する場合、scan.startup.mode を specific-offset に設定します。
scan.startup.specific-offset.gtid-set
特定のオフセット起動モードで使用する開始オフセットの GTID セット。
いいえ
STRING
なし
この設定を使用する場合、scan.startup.mode を specific-offset に設定します。GTID セットの例:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
タイムスタンプ起動モードで使用する開始オフセットのミリ秒単位のタイムスタンプ。
いいえ
LONG
なし
この設定を使用する場合、scan.startup.mode を timestamp に設定します。タイムスタンプはミリ秒単位です。
重要タイムスタンプを使用する場合、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、対応する binlog ファイルを検索しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。
server-time-zone
データベースで使用されるセッションタイムゾーン。
いいえ
STRING
このオプションを指定しない場合、システムは Flink ジョブの実行環境のタイムゾーンをデータベースサーバータイムゾーンとして使用します(選択したゾーンのタイムゾーン)。
例:Asia/Shanghai。このオプションは、MySQL TIMESTAMP 型が STRING 型に変換される方法を制御します。詳細については、「Debezium 時間型値」をご参照ください。
debezium.min.row.count.to.stream.results
テーブルの行数がこの値を超える場合、バッチ読み取りモードを使用します。
いいえ
INTEGER
1000
Flink は、MySQL ソーステーブルからデータを次のように読み取ります:
完全読み取り:テーブルの全データを直接メモリに読み込みます。これは高速ですが、データ量に比例してメモリを消費します。ソーステーブルが非常に大きい場合、OOM 問題が発生する可能性があります。
バッチ読み取り:固定数の行を 1 バッチとして読み取り、すべてのデータが読み取られるまで繰り返します。これは、大規模なテーブルに対する OOM リスクを回避しますが、速度は遅くなります。
connect.timeout
MySQL データベースサーバーへの接続がタイムアウトするまでの最大待機時間(再試行する前)。
いいえ
DURATION
30s
なし。
connect.max-retries
MySQL データベースサービスへの接続失敗後の最大再試行回数。
いいえ
INTEGER
3
なし。
connection.pool.size
データベース接続プールのサイズ。
いいえ
INTEGER
20
データベース接続プールは、データベース接続数を削減するために接続を再利用します。
jdbc.properties.*
JDBC URL のカスタム接続オプション。
いいえ
STRING
なし
カスタム接続オプションを渡すことができます。たとえば、SSL を無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。
サポートされている接続オプションの詳細については、「MySQL 設定プロパティ」をご参照ください。
debezium.*
binlog を読み取るためのカスタム Debezium オプション。
いいえ
STRING
なし
カスタム Debezium オプションを渡すことができます。たとえば、解析エラーの処理方法を指定するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。
heartbeat.interval
ソースがハートビートイベントを使用して binlog オフセットを進める間隔。
いいえ
DURATION
30s
ハートビートイベントは、ソースで binlog オフセットを進めるのに役立ちます。これは、MySQL で更新が遅いテーブルに特に有用です。このようなテーブルでは、binlog オフセットが自動的に進まないため、ハートビートイベントによって binlog オフセットを前方に進めることで、期限切れになった binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になるという問題を防ぎます。
scan.incremental.snapshot.chunk.key-column
スナップショットフェーズ中にチャンクを分割するために使用するカラム。
備考を参照してください。
STRING
なし
プライマリキーのないテーブルに必須です。選択したカラムは NULL 不可(NOT NULL)である必要があります。
プライマリキーのあるテーブルでは任意です。プライマリキーから 1 つのカラムのみを選択できます。
rds.region-id
Alibaba Cloud RDS MySQL インスタンスのリージョン ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
なし
リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。
rds.access-key-id
Alibaba Cloud RDS MySQL アカウントの AccessKey ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
なし
詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、AccessKey ID はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。
rds.access-key-secret
Alibaba Cloud RDS MySQL アカウントの AccessKey Secret。
OSS からアーカイブログを読み取る場合に必須です。
STRING
なし
詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、AccessKey Secret はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。
rds.db-instance-id
Alibaba Cloud RDS MySQL インスタンスのインスタンス ID。
OSS からアーカイブログを読み取る場合に必須です。
STRING
なし
なし。
rds.main-db-id
Alibaba Cloud RDS MySQL インスタンスのプライマリデータベース ID。
いいえ
STRING
なし
プライマリデータベース ID の取得方法については、「RDS MySQL ログバックアップ」をご参照ください。
VVR 8.0.7 以降でのみサポートされます。
rds.download.timeout
OSS から単一のアーカイブログをダウンロードする際のタイムアウト。
いいえ
DURATION
60s
なし。
rds.endpoint
OSS binlog 情報を取得するためのサービスエンドポイント。
いいえ
STRING
なし
有効な値については、「サービスエンドポイント」をご参照ください。
VVR 8.0.8 以降でのみサポートされます。
scan.incremental.close-idle-reader.enabled
スナップショットフェーズ終了後にアイドルリーダーを閉じるかどうかを指定します。
いいえ
BOOLEAN
false
VVR 8.0.1 以降でのみサポートされます。
この設定は、execution.checkpointing.checkpoints-after-tasks-finish.enabled が true に設定されている場合にのみ有効です。
scan.read-changelog-as-append-only.enabled
チャネルログストリームを追加専用ストリームに変換するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのメッセージタイプを INSERT メッセージに変換します。上流テーブルの削除メッセージを保持するなどの特殊なケースでのみ有効化してください。
false(デフォルト):すべてのメッセージタイプがそのまま通過します。
説明VVR 8.0.8 以降でのみサポートされます。
scan.only.deserialize.captured.tables.changelog.enabled
増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。
いいえ
BOOLEAN
VVR 8.x ではデフォルト値は false です。
VVR 11.1 以降ではデフォルト値は true です。
有効な値:
true:ターゲットテーブルの変更データのみを逆シリアル化して、binlog 読み取りを高速化します。
false(デフォルト):すべてのテーブルの変更データを逆シリアル化します。
説明VVR 8.0.7 以降でのみサポートされます。
VVR 8.0.8 以前では、このパラメーターを debezium.scan.only.deserialize.captured.tables.changelog.enable に名前を変更します。
scan.parse.online.schema.changes.enabled
増分フェーズ中に、RDS ロックレス DDL イベントの解析を試行するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:RDS ロックレス DDL イベントを解析します。
false(デフォルト):RDS ロックレス DDL イベントを解析しません。
これは実験的な機能です。オンラインロックレス変更を実行する前に、Flink ジョブの回復用にスナップショットを取得してください。
説明VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.backfill.skip
スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:スナップショット読み取りフェーズ中にバックフィルをスキップします。
false(デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。
バックフィルがスキップされた場合、スナップショットフェーズ中のテーブルの変更は、後続の増分フェーズで読み取られ、スナップショットにマージされません。
重要バックフィルをスキップすると、スナップショットフェーズ中の変更が再再生される可能性があるため、データの不整合が発生する可能性があります。保証されるのは、最低限 1 回のセマンティクスのみです。
説明VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.unbounded-chunk-first.enabled
スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布するかどうかを指定します。
いいえ
BOOELEAN
false
有効な値:
true:スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布します。
false(デフォルト):スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布しません。
これは実験的な機能です。これを有効化すると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクが低減します。ジョブの初回起動前にこれを追加することを推奨します。
説明VVR 11.1 以降でのみサポートされます。
binlog.session.network.timeout
binlog 接続の読み取り/書き込み操作のネットワークタイムアウト。
いいえ
DURATION
10m
これを 0s に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。
説明VVR 11.5 以降でのみサポートされます。
scan.rate-limit.records-per-second
ソースによって 1 秒あたりに出力される最大レコード数を制限します。
いいえ
LONG
なし
データ読み取りを制限するのに役立ちます。この制限は、完全フェーズおよび増分フェーズの両方に適用されます。
numRecordsOutPerSecondメトリックは、データフロー全体で 1 秒あたりに出力されるレコード数を反映します。このメトリックに基づいて、このパラメーターを調整してください。完全読み取り中は、
scan.incremental.snapshot.chunk.sizeの値を下げることで、1 バッチあたりの行数を減らしてください。説明VVR 11.5 以降でのみサポートされます。
ディメンションテーブル固有
パラメーター
説明
必須
データ型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
なし
URL 形式は
jdbc:mysql://<endpoint>:<port>/<database_name>です。lookup.max-retries
データ読み取り失敗後の最大再試行回数。
いいえ
INTEGER
3
VVR 6.0.7 以降でのみサポートされます。
lookup.cache.strategy
キャッシュポリシー。
いいえ
STRING
なし
3つのキャッシュポリシーをサポートしています:なし、LRU、およびすべて。詳細については、「背景情報」をご参照ください。
説明LRU キャッシュポリシーを使用する場合、lookup.cache.max-rows オプションも設定する必要があります。
lookup.cache.max-rows
キャッシュされる最大行数。
いいえ
INTEGER
100000
LRU キャッシュポリシーを選択する場合、キャッシュサイズを指定する必要があります。
ALL キャッシュポリシーを選択する場合、任意です。
lookup.cache.ttl
キャッシュの生存時間(TTL)。
いいえ
DURATION
10 s
lookup.cache.ttl の設定は、lookup.cache.strategy に依存します:
lookup.cache.strategy が None に設定されている場合、lookup.cache.ttl は任意であり、キャッシュは期限切れになりません。
lookup.cache.strategy が LRU に設定されている場合、lookup.cache.ttl はキャッシュの TTL です。デフォルトでは期限切れになりません。
lookup.cache.strategy が ALL に設定されている場合、lookup.cache.ttl はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。
1min や 10s などの形式で時間を指定してください。
lookup.max-join-rows
プライマリテーブルの各行に対して、ディメンションテーブルからクエリする際に返される結果の最大数。
いいえ
INTEGER
1024
なし。
lookup.filter-push-down.enabled
ディメンションテーブルのフィルターのプッシュダウンを有効化するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:ディメンションテーブルのフィルターのプッシュダウンを有効化します。MySQL データベーステーブルからデータを読み込む際、ディメンションテーブルは SQL ジョブで設定された条件に基づいて事前にデータをフィルターします。
false(デフォルト):ディメンションテーブルのフィルターのプッシュダウンを無効化します。MySQL データベーステーブルからデータを読み込む際、ディメンションテーブルはすべてのデータを読み込みます。
説明VVR 8.0.7 以降でのみサポートされます。
重要フィルターのプッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効化すべきです。MySQL ソーステーブルは、フィルターのプッシュダウンを有効化できません。Flink テーブルがソーステーブルおよびディメンションテーブルの両方として使用される場合、ディメンションテーブルでフィルターのプッシュダウンを有効化するときは、SQL ヒント を使用して、ソーステーブルでこのオプションを明示的に false に設定してください。そうしないと、ジョブが異常終了する可能性があります。
シンク固有の
パラメーター
説明
必須
データ型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
なし
URL 形式は
jdbc:mysql://<endpoint>:<port>/<database_name>です。sink.max-retries
データ書き込み失敗後の最大再試行回数。
いいえ
INTEGER
3
なし。
sink.buffer-flush.batch-size
単一のバッチで書き込まれるレコード数。
いいえ
INTEGER
4096
なし。
sink.buffer-flush.max-rows
メモリ内にバッファーされるデータレコード数。
いいえ
INTEGER
10000
このオプションは、プライマリキーが指定されている場合にのみ有効です。
sink.buffer-flush.interval
バッファーをフラッシュする時間間隔。指定された時間待機した後も出力条件を満たさない場合、システムは自動的にすべてのバッファーデータを出力します。
いいえ
DURATION
1s
なし。
sink.ignore-delete
DELETE 操作を無視するかどうかを指定します。
いいえ
BOOLEAN
false
Flink SQL によって生成されるストリームに delete または update-before レコードが含まれている場合、複数の出力タスクによる同一テーブルの異なるフィールドへの同時更新により、データの不整合が発生する可能性があります。
たとえば、レコードが削除された後に、別のタスクが一部のフィールドのみを更新すると、更新されていないフィールドが null またはデフォルト値になるため、データエラーが発生します。
このような問題を回避するには、sink.ignore-delete を true に設定して、上流の DELETE および UPDATE_BEFORE 操作を無視してください。
説明UPDATE_BEFORE は Flink のリトラクション機構の一部であり、更新操作で「古い値」を「取り消す」ために使用されます。
ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE レコードがスキップされ、INSERT および UPDATE_AFTER レコードのみが処理されます。
sink.ignore-null-when-update
データを更新する際に、入力フィールド値が null の場合に、対応するフィールドを null に設定するか、更新をスキップするかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:フィールドの更新をスキップします。Flink テーブルにプライマリキーがある場合にのみサポートされます。true に設定した場合:
VVR 8.0.6 以前では、結果テーブルへのデータ書き込みのバッチ実行はサポートされません。
VVR 8.0.7 以降では、結果テーブルへのデータ書き込みのバッチ実行がサポートされます。
バッチ書き込みは書き込み効率および全体的なスループットを向上させますが、データ遅延および OOM リスクを引き起こす可能性があります。ビジネスシナリオに応じて、これらのトレードオフを調整してください。
false:フィールドを null に設定します。
説明VVR 8.0.5 以降でのみサポートされます。
型マッピング
CDC ソーステーブル
MySQL CDC フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
重要TINYINT(1) 型を MySQL で 0 および 1 以外の値を格納するために使用しないことを推奨します。property-version=0 の場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink の BOOLEAN 型にマッピングします。これにより、データの不正確さが発生する可能性があります。TINYINT(1) を 0 および 1 以外の値を格納するために使用する場合は、「catalog.table.treat-tinyint1-as-boolean」の設定オプションをご参照ください。
次元テーブルと結果テーブル
MySQL フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
説明p は ≤ 38 である必要があります。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink は、最大サイズが 2,147,483,647(2^31 - 1)の MySQL BLOB レコードをサポートしています。
BLOB
MEDIUMBLOB
LONGBLOB
データインジェスト
MySQL コネクタは、データインジェスト YAML ジョブのデータソースとして使用できます。
構文
source:
type: mysql
name: MySQL ソース
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxx設定項目
パラメーター | 説明 | 必須 | データ型 | デフォルト値 | 備考 |
type | データソースのタイプ。 | はい | STRING | なし | このオプションを mysql に設定します。 |
name | データソースの名前。 | いいえ | STRING | なし | なし。 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 | はい | STRING | なし | Virtual Private Cloud(VPC)アドレスの入力を推奨します。 説明 MySQL データベースと Realtime Compute for Apache Flink が同じ VPC 内にない場合、クロス VPC ネットワーク接続を確立するか、インターネット経由でアクセスしてください。詳細については、「ワークスペースの管理および運用」および「フルマネージド Flink クラスターがインターネットにアクセスする方法」をご参照ください。 |
username | MySQL データベースサービスのユーザー名。 | はい | STRING | なし | なし。 |
password | MySQL データベースサービスのパスワード。 | はい | STRING | なし | なし。 |
tables | 同期する MySQL データテーブル。 | はい | STRING | なし |
説明
|
tables.exclude | 同期から除外するテーブル。 | いいえ | STRING | なし |
説明 ピリオドはデータベース名とテーブル名を区切ります。ピリオドで任意の文字をマッチさせるには、バックスラッシュでエスケープします。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | MySQL データベースサービスのポート番号。 | いいえ | INTEGER | 3306 | なし。 |
schema-change.enabled | スキーマ変更イベントを送信するかどうかを指定します。 | いいえ | BOOLEAN | true | なし。 |
server-id | データベースクライアントが同期に使用する数値 ID または範囲。 | いいえ | STRING | 5400 ~ 6400 の間のランダム値が生成されます。 | この ID は、MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブに対して、異なる ID を割り当てることを推奨します。 このオプションは、5400-5408 のような ID の範囲もサポートします。増分読み取りが有効化されている場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用できるよう、ID の範囲を指定することを推奨します。 |
jdbc.properties.* | JDBC URL のカスタム接続パラメーター。 | いいえ | STRING | なし | カスタム接続パラメーターを渡すことができます。たとえば、SSL を無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。 サポートされている接続パラメーターの詳細については、「MySQL 設定プロパティ」をご参照ください。 |
debezium.* | バイナリログを読み取るためのカスタム Debezium パラメーター。 | いいえ | STRING | なし | カスタム Debezium パラメーターを渡すことができます。たとえば、解析エラーの処理方法を指定するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。 |
scan.incremental.snapshot.chunk.size | 各チャンクのサイズ(行数)。 | いいえ | INTEGER | 8096 | MySQL テーブルは、読み取り用に複数のチャンクに分割されます。チャンクデータは、完全に読み取られるまでメモリ内にバッファーされます。 各チャンクに含まれる行数が少ないほど、テーブル内のチャンクの総数が多くなります。これは障害回復の粒度を小さくしますが、OOM 問題を引き起こしたり、全体的なスループットを低下させたりする可能性があります。したがって、これらの要因をバランスよく考慮し、適切なチャンクサイズを設定する必要があります。 |
scan.snapshot.fetch.size | 完全テーブルデータを読み取る際に、1 回に取得する最大レコード数。 | いいえ | INTEGER | 1024 | なし。 |
scan.startup.mode | データ消費の起動モード。 | いいえ | STRING | initial | 有効な値:
重要 earliest-offset、specific-offset、および timestamp の場合、起動時刻と指定された開始オフセット時刻の間にテーブルスキーマが変更されていると、スキーマの不一致によるジョブ失敗が発生します。言い換えると、これらの 3 つの起動モードを使用する場合、指定された binlog 消費位置とジョブ起動時刻の間にテーブルスキーマが変更されないことを確認する必要があります。 |
scan.startup.specific-offset.file | 特定のオフセット起動モードで使用する開始オフセットの binlog ファイル名。 | いいえ | STRING | なし | この設定を使用する場合、scan.startup.mode を specific-offset に設定します。ファイル名の例: |
scan.startup.specific-offset.pos | 特定のオフセット起動モードで使用する指定された binlog ファイル内の開始オフセット。 | いいえ | INTEGER | なし | この設定を使用する場合、scan.startup.mode を specific-offset に設定します。 |
scan.startup.specific-offset.gtid-set | 特定のオフセット起動モードで使用する開始オフセットの GTID セット。 | いいえ | STRING | なし | この設定を使用する場合、scan.startup.mode を specific-offset に設定します。GTID セットの例: |
scan.startup.timestamp-millis | タイムスタンプ起動モードで使用する開始オフセットのミリ秒単位のタイムスタンプ。 | いいえ | LONG | なし | この設定を使用する場合、scan.startup.mode を timestamp に設定します。タイムスタンプはミリ秒単位です。 重要 タイムスタンプを使用する場合、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、対応する binlog ファイルを検索しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。 |
server-time-zone | データベースで使用されるセッションタイムゾーン。 | いいえ | STRING | このオプションを指定しない場合、システムは Flink ジョブの実行環境のタイムゾーンをデータベースサーバータイムゾーンとして使用します(選択したゾーンのタイムゾーン)。 | 例:Asia/Shanghai。このオプションは、MySQL TIMESTAMP 型が STRING 型に変換される方法を制御します。詳細については、「Debezium 時間型値」をご参照ください。 |
scan.startup.specific-offset.skip-events | 特定のオフセットから読み取る際にスキップする binlog イベント数。 | いいえ | INTEGER | なし | この設定を使用する場合、scan.startup.mode を specific-offset に設定します。 |
scan.startup.specific-offset.skip-rows | 特定のオフセットから読み取る際にスキップする行変更数。単一の binlog イベントは複数の行変更に対応する場合があります。 | いいえ | INTEGER | なし | この設定を使用する場合、scan.startup.mode を specific-offset に設定します。 |
connect.timeout | MySQL データベースサーバーへの接続がタイムアウトするまでの最大待機時間(再試行する前)。 | いいえ | DURATION | 30s | なし。 |
connect.max-retries | MySQL データベースサービスへの接続失敗後の最大再試行回数。 | いいえ | INTEGER | 3 | なし。 |
connection.pool.size | データベース接続プールのサイズ。 | いいえ | INTEGER | 20 | データベース接続プールは、データベース接続数を削減するために接続を再利用します。 |
heartbeat.interval | ソースがハートビートイベントを使用して binlog オフセットを進める間隔。 | いいえ | DURATION | 30s | ハートビートイベントは、ソースで binlog オフセットを進めるのに役立ちます。これは、MySQL で更新が遅いテーブルに特に有用です。このようなテーブルでは、binlog オフセットが自動的に進まないため、ハートビートイベントによって binlog オフセットを前方に進めることで、期限切れになった binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になるという問題を防ぎます。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズ中にチャンクを分割するために使用するカラム。 | いいえ。 | STRING | なし | プライマリキーから 1 つのカラムのみを選択できます。 |
rds.region-id | Alibaba Cloud RDS MySQL インスタンスのリージョン ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | なし | リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。 |
rds.access-key-id | Alibaba Cloud RDS MySQL アカウントの AccessKey ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | なし | 詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、AccessKey ID はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。 |
rds.access-key-secret | Alibaba Cloud RDS MySQL アカウントの AccessKey Secret。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | なし | 詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、AccessKey Secret はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。 |
rds.db-instance-id | Alibaba Cloud RDS MySQL インスタンスのインスタンス ID。 | OSS からアーカイブログを読み取る場合に必須です。 | STRING | なし | なし。 |
rds.main-db-id | Alibaba Cloud RDS MySQL インスタンスのプライマリデータベース ID。 | いいえ | STRING | なし | プライマリデータベース ID の取得方法については、「RDS MySQL ログバックアップ」をご参照ください。 |
rds.download.timeout | OSS から単一のアーカイブログをダウンロードする際のタイムアウト。 | いいえ | DURATION | 60s | なし。 |
rds.endpoint | OSS binlog 情報を取得するためのサービスエンドポイント。 | いいえ | STRING | なし | 有効な値については、「サービスエンドポイント」をご参照ください。 |
rds.binlog-directory-prefix | binlog ファイルを保存するディレクトリプレフィックス。 | いいえ | STRING | rds-binlog- | なし。 |
rds.use-intranet-link | 内部ネットワークを使用して binlog ファイルをダウンロードするかどうかを指定します。 | いいえ | BOOLEAN | true | なし。 |
rds.binlog-directories-parent-path | binlog ファイルを保存する親ディレクトリの絶対パス。 | いいえ | STRING | なし | なし。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | いいえ | INTEGER | 1000 | メタデータがこのサイズを超える場合、複数の部分に分割して送信されます。 |
chunk-key.even-distribution.factor.lower-bound | 均等シャーディングのためのチャンク分布係数の下限。 | いいえ | DOUBLE | 0.05 | この値より小さいチャンク分布係数は、不均等なシャーディングを引き起こします。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 行数の合計。 |
chunk-key.even-distribution.factor.upper-bound | 均等シャーディングのためのチャンク分布係数の上限。 | いいえ | DOUBLE | 1000.0 | この値より大きいチャンク分布係数は、不均等なシャーディングを引き起こします。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 行数の合計。 |
scan.incremental.close-idle-reader.enabled | スナップショットフェーズ終了後にアイドルリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | この設定が有効になるには、 |
scan.only.deserialize.captured.tables.changelog.enabled | 増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 | いいえ | BOOLEAN |
| 有効な値:
|
scan.parallel-deserialize-changelog.enabled | 増分フェーズ中に、マルチスレッドを使用して変更イベントを解析するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
説明 VVR 8.0.11 以降でのみサポートされます。 |
scan.parallel-deserialize-changelog.handler.size | 変更イベントの解析にマルチスレッドを使用する場合のイベントハンドラーの数。 | いいえ | INTEGER | 2 | 説明 VVR 8.0.11 以降でのみサポートされます。 |
metadata-column.include-list | 下流に渡すメタデータカラム。 | いいえ | STRING | なし | 利用可能なメタデータには、 説明 MySQL CDC YAML コネクタは、データベース名、テーブル名、および 重要
|
scan.newly-added-table.enabled | チェックポイントからの再起動時に、前回の実行でマッチしなかった新しく追加されたテーブルを同期するか、現在マッチしていないテーブルを状態から削除するかを指定します。 | いいえ | BOOLEAN | false | これは、チェックポイントまたはセーブポイントからの再起動時に有効になります。 |
scan.binlog.newly-added-table.enabled | 増分フェーズ中に、パターンに一致する新しく追加されたテーブルのデータを送信するかどうかを指定します。 | いいえ | BOOLEAN | false | これは |
scan.incremental.snapshot.chunk.key-column | 特定のテーブルに対して、スナップショットフェーズ中のチャンク分割キーとして使用するカラムを指定します。 | いいえ | STRING | なし |
|
scan.parse.online.schema.changes.enabled | 増分フェーズ中に、RDS ロックレス DDL イベントの解析を試行するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
これは実験的な機能です。オンラインロックレス変更を実行する前に、Flink ジョブの回復用にスナップショットを取得してください。 説明 VVR 11.0 以降でのみサポートされます。 |
scan.incremental.snapshot.backfill.skip | スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
バックフィルがスキップされた場合、スナップショットフェーズ中のテーブルの変更は、後続の増分フェーズで読み取られ、スナップショットにマージされません。 重要 バックフィルをスキップすると、スナップショットフェーズ中の変更が再再生される可能性があるため、データの不整合が発生する可能性があります。保証されるのは、最低限 1 回のセマンティクスのみです。 説明 VVR 11.1 以降でのみサポートされます。 |
treat-tinyint1-as-boolean.enabled | TINYINT(1) 型をブール値型として扱うかどうかを指定します。 | いいえ | BOOLEAN | true | 有効な値:
|
treat-timestamp-as-datetime-enabled | MySQL TIMESTAMP を DATETIME として処理するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
MySQL TIMESTAMP は UTC 時間を格納し、タイムゾーンの影響を受けます。MySQL DATETIME はリテラル時間を格納し、タイムゾーンの影響を受けません。 これを有効化すると、MySQL TIMESTAMP データは server-time-zone に基づいて DATETIME に変換されます。 |
include-comments.enabled | テーブルおよびカラムのコメントを同期するかどうかを指定します。 | いいえ | BOOELEAN | false | 有効な値:
これを有効化すると、ジョブのメモリ使用量が増加します。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布するかどうかを指定します。 | いいえ | BOOELEAN | false | 有効な値:
これは実験的な機能です。これを有効化すると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクが低減します。ジョブの初回起動前にこれを追加することを推奨します。 説明 VVR 11.1 以降でのみサポートされます。 |
binlog.session.network.timeout | binlog 接続のネットワークタイムアウト。 | いいえ | DURATION | 10m | これを 0s に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。 説明 VVR 11.5 以降でのみサポートされます。 |
scan.rate-limit.records-per-second | ソースによって 1 秒あたりに出力される最大レコード数を制限します。 | いいえ | LONG | なし | データ読み取りを制限するのに役立ちます。この制限は、完全および増分フェーズの両方に適用されます。
完全読み取り中は、 説明 VVR 11.5 以降でのみサポートされます。 |
型マッピング
以下の表に、データインジェストの型マッピングを示します。
MySQL CDC フィールド型 | CDC フィールド型 |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] | フィールドマッピングは
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | STRING 説明 MySQL では、10 進数の精度は最大 65 まで可能です。Flink では、10 進数の精度は 38 に制限されています。精度が 38 を超える 10 進カラムを定義する場合、精度の損失を回避するために文字列にマッピングしてください。 |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING 説明 JSON データ型は、Flink で JSON 形式の文字列に変換されます。 |
GEOMETRY | STRING 説明 MySQL の空間データ型は、固定の JSON 形式を持つ文字列に変換されます。詳細については、「空間データ型マッピング」をご参照ください。 |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES 説明 MySQL は、最大長が 2,147,483,647(2**31-1)バイトの BLOB をサポートしています。 |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
例
CDC ソーステーブル
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;シンクテーブル
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;データインジェストソース
source: type: mysql name: MySQL ソース hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values シンク print.enabled: true sink.print.logger: true
MySQL CDC ソーステーブルについて
仕組み
MySQL CDC ソーステーブルは、起動時にテーブル全体をスキャンし、プライマリキーに基づいて複数のチャンクに分割します。現在の binlog オフセットを記録した後、増分スナップショットアルゴリズムを使用して SELECT 文で各チャンクのデータを読み取ります。ジョブは定期的にチェックポイントを実行して、完了したチャンクを記録します。フェールオーバーが発生した場合、ジョブは未完了のチャンクのみを継続して読み取ります。すべてのチャンクの読み取りが完了した後、ジョブは以前に記録された binlog オフセットから増分変更レコードを読み取ります。Flink ジョブは引き続き定期的にチェックポイントを実行して binlog オフセットを記録します。フェールオーバーが発生した場合、ジョブは最後に記録された binlog オフセットから処理を再開します。このプロセスにより、1 回限りのセマンティクスが実現されます。
増分スナップショットアルゴリズムの詳細については、「MySQL CDC コネクタ」をご参照ください。
メタデータ
メタデータは、シャーディングされたデータベースおよびテーブルのマージシナリオで非常に有用です。マージ後も、ビジネスでは各行のデータのソースデータベースおよびテーブルを識別する必要がある場合がよくあります。メタデータカラムを使用すると、この情報をアクセスできます。そのため、メタデータカラムを使用して、複数のシャーディングされたテーブルを簡単に 1 つの宛先テーブルにマージできます。
MySQL CDC ソースは、メタデータカラム構文をサポートしています。以下のメタデータにメタデータカラム経由でアクセスできます。
メタデータキー
メタデータ型
説明
database_name
STRING NOT NULL
行を含むデータベースの名前。
table_name
STRING NOT NULL
行を含むテーブルの名前。
op_ts
TIMESTAMP_LTZ(3) NOT NULL
データベース内で行が変更された時刻。レコードが binlog ではなくテーブルの既存の履歴データに由来する場合、この値は常に 0 になります。
op_type
STRING NOT NULL
行の変更タイプ。
+I:INSERT メッセージ
-D:DELETE メッセージ
-U:UPDATE_BEFORE メッセージ
+U:UPDATE_AFTER メッセージ
説明Ververica Runtime(VVR)8.0.7 以降でのみサポートされます。
query_log
STRING NOT NULL
この行に対応する MySQL クエリログレコードを読み取ります。
説明MySQL は、クエリログを記録するために binlog_rows_query_log_events パラメーターを有効化している必要があります。
次の例は、MySQL インスタンス内の異なるシャーディングされたデータベースの複数の orders テーブルをマージし、Hologres の holo_orders テーブルに同期する方法を示しています。
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- データベース名を読み取ります。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- テーブル名を読み取ります。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更タイムスタンプを読み取ります。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取ります。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 正規表現を使用して複数のシャーディングされたデータベースに一致させます。 'table-name' = 'orders_.*' -- 正規表現を使用して複数のシャーディングされたテーブルに一致させます。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;上記のコードに基づき、WITH 句で scan.read-changelog-as-append-only.enabled を true に設定すると、下流テーブルのプライマリキー構成に基づいて出力が異なります:
下流テーブルのプライマリキーが order_id の場合、出力には上流テーブルの各プライマリキーの最新の変更のみが含まれます。たとえば、プライマリキーの最新の変更が削除操作の場合、下流テーブルには同じプライマリキーと op_type が -D のレコードが表示されます。
下流テーブルのプライマリキーが order_id、operation_ts、および op_type の場合、出力には上流テーブルの各プライマリキーの完全な変更履歴が含まれます。
正規表現のサポート
MySQL CDC ソーステーブルは、テーブル名またはデータベース名に正規表現を使用して、複数のテーブルまたはデータベースに一致させることをサポートしています。次の例は、正規表現を使用して複数のテーブルを指定する方法を示しています。
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正規表現を使用して複数のデータベースに一致させます。 'table-name' = '(t[5-8]|tt)' -- 正規表現を使用して複数のテーブルに一致させます。 );上記の例の正規表現の説明:
^(test).* はプレフィックス一致の例です。この式は、test1 や test2 などの test で始まるデータベース名に一致します。
.*[p$] はサフィックス一致の例です。この式は、cdcp や edcp などの p で終わるデータベース名に一致します。
txc は完全一致です。特定のデータベース名 txc に一致します。
完全修飾テーブル名を一致させる際、MySQL CDC はデータベース名とテーブル名の両方を使用してテーブルを一意に識別します。パターン database-name.table-name を使用して一致させます。たとえば、パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) は、データベース内の txc.tt や test2.test5 などのテーブルに一致します。
重要SQL ジョブの構成では、table-name および database-name オプションは、複数のテーブルまたはデータベースを指定するためにカンマ(,)をサポートしていません。
複数のテーブルに一致させる場合や複数の正規表現を使用する場合は、縦線(|)で区切って括弧で囲む必要があります。たとえば、user テーブルと product テーブルを読み取るには、table-name を
(user|product)として構成します。正規表現にカンマが含まれている場合、縦線(|)演算子を使用して書き換える必要があります。たとえば、正規表現
mytable_\d{1, 2}は、カンマを使用しない同等の式(mytable_\d{1}|mytable_\d{2})に書き換える必要があります。
同時実行制御
MySQL コネクタは、完全データの同時読み取りをサポートしており、これによりデータ読み込み効率が向上します。Realtime Compute for Apache Flink コンソールの Autopilot と組み合わせることで、同時読み取りが完了した後の増分フェーズ中に自動的にスケールインし、コンピューティングリソースを節約します。
Realtime Compute 開発コンソールでは、リソース構成ページで基本モードまたはエキスパートモードのいずれかでジョブの並列処理の次数を設定できます。違いは次のとおりです:
基本モードで設定された並列処理の次数は、ジョブ全体のグローバルな並列処理の次数です。

エキスパートモードでは、必要に応じて特定の VERTEX の並列処理の次数を設定できます。

リソース構成の詳細については、「ジョブデプロイメント情報の設定」をご参照ください。
重要基本モードまたはエキスパートモードのいずれを使用する場合でも、テーブルで宣言された server-id 範囲は、ジョブの並列処理の次数以上である必要があります。たとえば、server-id 範囲が 5404-5412 の場合、8 個の一意な server ID があります。したがって、ジョブは最大 8 個の並列タスクを持つことができます。同じ MySQL インスタンスに対する異なるジョブは、重複しない server-id 範囲を持つ必要があります。各ジョブは、明示的に異なる server-id を設定する必要があります。
Autopilot 自動スケールイン
完全データフェーズでは、大量の履歴データが蓄積されます。読み取り効率を向上させるために、通常、履歴データは同時読み取りされます。一方、増分 binlog フェーズでは、binlog データ量が少なく、グローバルな順序付けを維持する必要があるため、通常は単一の並列処理で十分です。Autopilot は、完全フェーズと増分フェーズの間でこれらの異なる要件を満たすために、パフォーマンスとリソースのバランスを自動的に調整します。
Autopilot は、MySQL CDC ソースの各タスクのトラフィックを監視します。binlog フェーズに移行した際に、1 つのタスクのみが binlog 読み取りを処理し、他のタスクがアイドル状態になる場合、Autopilot は自動的にソースの CU 数および並列処理の次数を削減します。Autopilot を有効化するには、ジョブの運用保守ページで Autopilot モードを Active に設定します。
説明並列処理の次数をスケールダウンするためのデフォルトの最小トリガー間隔は 24 時間です。Autopilot パラメーターおよび詳細については、「Autopilot の設定」をご参照ください。
起動モード
scan.startup.mode オプションを使用して、MySQL CDC ソーステーブルの起動モードを指定できます。有効な値は次のとおりです:
initial(デフォルト):最初の起動時にデータベーステーブルの完全読み取りを実行し、その後増分モードに切り替えて binlog を読み取ります。
earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古い binlog オフセットから読み取りを開始します。
latest-offset:スナップショットフェーズをスキップし、binlog の末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの起動後に発生した変更のみを読み取ります。
specific-offset:スナップショットフェーズをスキップし、指定された特定の binlog オフセットから読み取りを開始します。オフセットは、binlog ファイル名と位置、または GTID セットで指定できます。
timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプから binlog イベントの読み取りを開始します。
例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始します。 'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始します。 'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始します。 'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始します。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードの binlog ファイル名を指定します。 'scan.startup.specific-offset.pos' = '4', -- specific-offset モードの binlog 位置を指定します。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードの GTID セットを指定します。 'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードの起動タイムスタンプを指定します。 ... )重要MySQL ソースは、チェックポイント中に INFO レベルで現在の位置をログに記録します。ログのプレフィックスは
Binlog offset on checkpoint {checkpoint-id}です。このログは、特定のチェックポイント位置からジョブを再起動する際に役立ちます。読み取り中のテーブルのスキーマが過去に変更されている場合、earliest-offset、specific-offset、または timestamp から開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のスキーマを保存しており、スキーマが一致しない古いデータを正しく解析できないためです。
キーのない CDC ソーステーブル
キーのないテーブルを使用するには、scan.incremental.snapshot.chunk.key-column を設定し、NULL 不可のフィールドのみを選択する必要があります。
キーのない CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定されたカラムの動作に依存します:
指定されたカラムが更新されない場合、1 回限りのセマンティクスが保証されます。
指定されたカラムが更新される場合、最低限 1 回のセマンティクスのみが保証されます。ただし、下流システムと組み合わせて、下流のプライマリキーを指定し、べき等操作を使用することで、データの正確性を確保できます。
RDS MySQL からのバックアップログの読み取り
MySQL CDC ソーステーブルは、Alibaba Cloud RDS MySQL からのバックアップログの読み取りをサポートしています。この機能は、完全スナップショットフェーズに長い時間がかかる場合に特に有用です。この場合、ローカルの binlog ファイルは自動的にクリーンアップされる可能性がありますが、手動または自動でアップロードされたバックアップファイルは依然として存在します。
例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )CDC ソースの再利用を有効化
複数の MySQL CDC ソーステーブルを持つ単一のジョブは、複数の binlog クライアントを起動します。すべてのソーステーブルが同じ MySQL インスタンスから読み取る場合、この方法はデータベースへの負荷を増加させます。詳細については、「MySQL CDC よくある質問」をご参照ください。
解決策
VVR 8.0.7 以降のバージョンでは、MySQL CDC ソースの再利用がサポートされています。再利用により、互換性のある MySQL CDC ソーステーブルがマージされます。ソーステーブルがデータベース名、テーブル名、および
server-idを除いて同一の構成を共有している場合にマージが発生します。エンジンは、同一ジョブ内の MySQL CDC ソースを自動的にマージします。手順
SQL ジョブで
SETコマンドを使用できます:SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 および 8.0.9 バージョン)以下も設定します: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 以降のバージョンでは、再利用がデフォルトで有効化されています。
ステートレスでジョブを開始します。 ソース再利用構成を変更すると、ジョブトポロジーが変更されます。ステートなしでジョブを開始しないと、ジョブが正常に開始されなかったり、データが失われたりする可能性があります。ソースがマージされた場合、ジョブトポロジーに
MergetableSourceScanノードが表示されます。
重要再利用を有効化した後、
pipeline.operator-chainingをfalseに設定しないでください。オペレーターチェーンを無効化すると、シリアル化および逆シリアル化のオーバーヘッドが追加されます。マージされるソースが多いほど、オーバーヘッドが大きくなります。VVR 8.0.7 では、オペレーターチェーンを無効化するとシリアル化の問題が発生します。
binlog 読み取りの高速化
MySQL コネクタをソーステーブルまたはデータインジェストソースとして使用する場合、増分フェーズ中に binlog ファイルを解析してさまざまな変更メッセージを生成します。binlog ファイルは、バイナリ形式でテーブルのすべての変更を記録します。binlog ファイルの解析を高速化するには、次の方法があります:
解析フィルター構成を有効化
scan.only.deserialize.captured.tables.changelog.enabledオプションを使用して、指定されたテーブルの変更イベントのみを解析します。
Debezium オプションの最適化
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:ブロッキングキューが保持できる最大レコード数。Debezium がデータベースからイベントストリームを読み取る際、イベントを下流に書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。debezium.max.batch.size:1 回の反復で処理される最大イベント数。デフォルト値は 2048 です。debezium.poll.interval.ms:コネクタが新しい変更イベントを要求する前に待機するミリ秒数。デフォルト値は 1000 ms(1 秒)です。
例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium 設定
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 解析フィルターを有効化
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 指定されたテーブルの変更イベントのみを解析します。
...
)source:
type: mysql
name: MySQL ソース
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium 設定
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# 解析フィルターを有効化
scan.only.deserialize.captured.tables.changelog.enabled: trueMySQL CDC のエンタープライズ版は、オープンソースコミュニティ版の約 2 倍である 85 MB/s の速度で binlog を消費します。binlog 生成速度が 85 MB/s(512 MB のファイルを 6 秒ごとに生成する速度に相当)を超える場合、Flink ジョブの遅延が継続的に増加します。binlog 生成速度が低下すると、遅延は徐々に減少します。binlog ファイルに大規模なトランザクションが含まれている場合、処理遅延が一時的に増加し、トランザクションのログの読み取りが完了すると減少します。
MySQL CDC DataStream API
DataStream API を使用してデータの読み取りおよび書き込みを行うには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法については、「DataStream コネクタの使用方法」をご参照ください。
次の例は、DataStream API プログラムを作成し、MySqlSource を使用する方法を示しています。必要な pom 依存関係も含まれています。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // キャプチャ対象のデータベースを設定します。
.tableList("yourDatabaseName.yourTableName") // キャプチャ対象のテーブルを設定します。
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // SourceRecord を JSON 文字列に変換します。
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// チェックポイントを有効化します。
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL ソース")
// 並列ソースタスクを 4 つに設定します。
.setParallelism(4)
.print().setParallelism(1); // メッセージの順序を維持するために、シンクの並列処理の次数を 1 に設定します。
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>MySqlSource をビルドする際、コード内で以下のパラメーターを指定する必要があります:
パラメーター | 説明 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 |
port | MySQL データベースサービスのポート番号。 |
databaseList | MySQL データベースの名前。 説明 データベース名は、複数のデータベースからデータを読み取るために正規表現をサポートします。 |
username | MySQL データベースサービスのユーザー名。 |
password | MySQL データベースサービスのパスワード。 |
deserializer | SourceRecord オブジェクトを指定された型に変換するデシリアライザー。有効な値:
|
pom 依存関係では、以下のパラメーターを指定する必要があります:
${vvr.version} | Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョン(例: 説明 Maven に表示されるバージョン番号を使用してください。修正プログラムが定期的にリリースされており、これらの更新は他のチャネルを通じてアナウンスされない場合があります。 |
${flink.version} | Apache Flink のバージョン(例: 重要 ジョブ実行時の互換性の問題を回避するために、Realtime Compute for Apache Flink エンジンバージョンに対応する Apache Flink バージョンを使用してください。バージョンマッピングの詳細については、「エンジン」をご参照ください。 |
よくある質問
CDC ソーステーブルを使用する際に発生する可能性のある問題については、「CDC の問題」をご参照ください。