このトピックでは、MySQL コネクタの使用方法について説明します。
背景情報
MySQL コネクタは、ApsaraDB RDS for MySQL、PolarDB for MySQL、OceanBase (MySQL モード)、セルフマネージド MySQL など、MySQL プロトコルと互換性のあるすべてのデータベースをサポートします。
MySQL コネクタを使用して OceanBase からデータを読み取る場合は、バイナリログ (Binlog) が有効になっており、正しく構成されていることを確認してください。詳細については、「Binlog 関連の操作」をご参照ください。この機能はパブリックプレビュー段階です。使用する前に十分に評価することを推奨します。
MySQL コネクタは、次の機能をサポートしています。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェストのデータソース |
ランタイムモード | ストリーミングモードのみサポートされています。 |
データフォーマット | 該当なし |
特定のモニタリングメトリック | |
API タイプ | DataStream、SQL、およびデータインジェスト YAML |
結果テーブルのデータの更新または削除をサポート | はい |
特徴
MySQL 変更データキャプチャ (CDC) ソーステーブルは、最初にデータベースから完全な既存データを読み取るストリーミングソーステーブルです。その後、スムーズにバイナリログ (Binlog) の読み取りに切り替えることで、データの欠落や重複がないことを保証します。障害が発生した場合でも、Exactly-Once セマンティクスが保証されます。MySQL CDC ソーステーブルは、完全データの同時読み取りをサポートし、増分スナップショットアルゴリズムを使用して、ロックフリーの読み取りと再開可能なデータ転送を実装します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。
バッチ処理とストリーム処理の統合:コネクタは完全データと増分データの両方の読み取りをサポートしているため、個別のパイプラインは不要です。
水平方向のパフォーマンススケーリングのために、完全データの同時読み取りをサポートします。
完全データの読み取りから増分データの読み取りにシームレスに切り替え、自動的にスケールインして計算リソースを節約します。
完全データ読み取りフェーズでの再開可能なデータ転送をサポートし、安定性を向上させます。
完全データのロックフリー読み取りは、オンラインのビジネス運用に影響を与えません。
ApsaraDB RDS for MySQL からのバックアップログの読み取りをサポートします。
バイナリログファイルを並列で解析し、データ遅延を低減します。
前提条件
MySQL CDC ソーステーブルを使用する前に、「MySQL の構成」で説明されているように MySQL を構成して、前提条件を満たす必要があります。
RDS for MySQL
Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。
必須の MySQL バージョン:5.6、5.7、および 8.0.x。
バイナリログ (Binlog) を有効にする必要があります。デフォルトで有効になっています。
バイナリログのフォーマットは ROW である必要があります。これはデフォルトのフォーマットです。
binlog_row_image を FULL に設定します。これはデフォルトの設定です。
バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。
MySQL データベースとテーブルを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。
IP ホワイトリストを構成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのホワイトリストの構成」をご参照ください。
PolarDB for MySQL
Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。
必須の MySQL バージョン:5.6、5.7、および 8.0.x。
バイナリログ (Binlog) を有効にする必要があります。デフォルトでは無効になっています。
バイナリログのフォーマットは ROW である必要があります。これはデフォルトのフォーマットです。
binlog_row_image を FULL に設定します。これはデフォルトの設定です。
バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。
MySQL データベースとテーブルを作成します。詳細については、「PolarDB for MySQL クラスターのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。
IP ホワイトリストを構成します。詳細については、「PolarDB for MySQL クラスターのホワイトリストの構成」をご参照ください。
自己管理 MySQL
Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。
必須の MySQL バージョン:5.6、5.7、および 8.0.x。
バイナリログ (Binlog) を有効にする必要があります。デフォルトでは無効になっています。
バイナリログのフォーマットは ROW である必要があります。デフォルトのフォーマットは STATEMENT です。
binlog_row_image を FULL に設定します。これはデフォルトの設定です。
バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。
MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。
MySQL データベースとテーブルを作成します。詳細については、「自己管理型 MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。
IP ホワイトリストを構成します。詳細については、「自己管理型 MySQL インスタンスのホワイトリストの構成」をご参照ください。
制限事項
一般的な制限事項
MySQL CDC ソーステーブルはウォーターマーク定義をサポートしていません。
Create Table As Select (CTAS) および Create Database As Select (CDAS) ジョブでは、MySQL CDC ソーステーブルは一部のスキーマ変更を同期できます。サポートされている変更タイプの詳細については、「スキーマ進化の同期ポリシー」をご参照ください。
MySQL CDC コネクタは、バイナリログトランザクション圧縮機能をサポートしていません。したがって、MySQL CDC コネクタを使用して増分データを消費する場合は、この機能が無効になっていることを確認してください。そうしないと、増分データを取得できなくなる可能性があります。
RDS for MySQL の制限事項
RDS for MySQL の場合、セカンダリデータベースまたは読み取り専用レプリカからデータを読み取ることは推奨されません。これは、これらのインスタンス上のバイナリログのデフォルトの保持期間が非常に短いためです。バイナリログが期限切れになりクリーンアップされると、ジョブはバイナリログデータの消費に失敗し、エラーを報告します。
RDS for MySQL は、デフォルトでプライマリ/セカンダリ並列レプリケーションを有効にしますが、プライマリデータベースとセカンダリデータベース間のトランザクション順序の一貫性を保証しません。これにより、プライマリ/セカンダリ切り替え後のチェックポイントからのデータ回復中にデータが失われる可能性があります。この問題を回避するには、RDS for MySQL の slave_preserve_commit_order オプションを手動で有効にすることができます。
PolarDB for MySQL の制限事項
MySQL CDC ソーステーブルは、PolarDB for MySQL 1.0.19 以前のマルチマスタークラスターアーキテクチャ (詳細については、「マルチマスタークラスターとは」をご参照ください) からのデータ読み取りをサポートしていません。これらのクラスターバージョンによって生成されたバイナリログには、重複したテーブル ID が含まれている可能性があります。これにより、CDC ソーステーブルでスキーマ マッピングエラーが発生し、バイナリログデータの解析時にエラーが発生する可能性があります。
オープンソース MySQL の制限事項
デフォルトの構成では、MySQL はプライマリ-セカンダリ間のバイナリログレプリケーション中にトランザクションの順序を維持します。MySQL レプリカで並列レプリケーションが有効 (slave_parallel_workers > 1) になっているが、slave_preserve_commit_order=ON になっていない場合、そのトランザクションのコミット順序はプライマリデータベースと一致しない可能性があります。Flink CDC がチェックポイントから回復する際、順序が正しくないためにデータが欠落する可能性があります。MySQL レプリカで slave_preserve_commit_order = ON を設定することを推奨します。または、slave_parallel_workers = 1 を設定することもできますが、これによりレプリケーションのパフォーマンスが犠牲になります。
注意
結果テーブル
DDL で自動インクリメントプライマリキーを宣言しないでください。MySQL はデータ書き込み時に自動的にそれらを埋めます。
少なくとも 1 つの非プライマリキーフィールドを宣言する必要があります。そうしないと、エラーが報告されます。
DDL では、NOT ENFORCED は Flink がプライマリキー制約を強制しないことを意味します。プライマリキーの正確性と完全性を保証する必要があります。詳細については、「有効性チェック」をご参照ください。
ディメンションテーブル
インデックスを使用してクエリを高速化したい場合、JOIN 条件のフィールドの順序は、インデックスで定義された順序 (左端プレフィックスルール) と一致する必要があります。たとえば、インデックスが (a, b, c) の場合、JOIN 条件は
ON t.a = x AND t.b = yとなります。Flink によって生成された SQL はオプティマイザーによって再書き込みされる可能性があり、これにより実際のデータベースクエリ中にインデックスが使用されなくなることがあります。インデックスが使用されているかどうかを確認するには、実行計画 (EXPLAIN) または MySQL の Redis スローログをチェックして、実際に実行される SELECT 文を表示できます。
SQL
SQL ジョブで MySQL コネクタをソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。
構文
CREATE TEMPORARY TABLE mysqlcdc_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);コネクタが結果テーブルに書き込む方法:結果テーブルに書き込む際、コネクタは受信した各データレコードに対して SQL 文を構築して実行します。実行される具体的な SQL 文は次のとおりです:
プライマリキーのない結果テーブルの場合、
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);文が実行されます。プライマリキーのある結果テーブルの場合、
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;文が実行されます。注意:物理テーブルにプライマリキー以外の一意なインデックス制約がある場合、異なるプライマリキーを持つが同じ一意なインデックス値を持つ 2 つのレコードを挿入すると、一意なインデックスの競合が発生します。この競合により、データが上書きされて失われる可能性があります。
MySQL データベースで自動インクリメントプライマリキーが定義されている場合、Flink DDL で自動インクリメントフィールドを宣言しないでください。データベースはデータ書き込み中にこのフィールドを自動的に埋めます。コネクタは、自動インクリメントフィールドを持つデータの書き込みと削除のみをサポートし、更新はサポートしていません。
WITH パラメーター
一般
パラメーター
説明
必須
データの型
デフォルト値
備考
connector
テーブルタイプ。
はい
STRING
なし
ソーステーブルとして使用する場合、これを
mysql-cdcまたはmysqlに設定できます。2 つの値は同等です。ディメンションテーブルまたは結果テーブルとして使用する場合、値はmysqlでなければなりません。hostname
MySQL データベースの IP アドレスまたはホスト名。
はい
STRING
なし
Virtual Private Cloud (VPC) アドレスを入力することを推奨します。
説明MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ストレージ管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。
username
MySQL データベースサービスのユーザー名。
はい
STRING
なし
なし。
password
MySQL データベースサービスのパスワード。
はい
STRING
なし
なし。
database-name
MySQL データベース名。
はい
STRING
なし
ソーステーブルとして使用する場合、データベース名は正規表現をサポートし、複数のデータベースからデータを読み取ることができます。
正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号の使用は避けてください。具体的な理由については、table-name の備考をご参照ください。
table-name
MySQL テーブル名。
はい
STRING
なし
ソーステーブルとして使用する場合、テーブル名は正規表現をサポートし、複数のテーブルからデータを読み取ることができます。
複数の MySQL テーブルから読み取る場合、複数の Binlog リスナーを有効にすることを避けるために、複数の CTAS 文を単一のジョブとして送信することで、パフォーマンスと効率が向上します。詳細については、「複数の CTAS 文:単一のジョブとして送信」をご参照ください。
正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号の使用は避けてください。理由は以下で説明します。
説明正規表現でテーブル名を照合する場合、MySQL CDC ソーステーブルは、指定された database-name と table-name を \\. 文字列 (または Ververica Runtime (VVR) 8.0.1 より前のバージョンでは . 文字) で連結して、完全修飾正規表現を形成します。この式は、MySQL データベース内のテーブルの完全修飾名と照合するために使用されます。
たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' と構成した場合、コネクタは正規表現 db_.*\\.tb_.+ (またはバージョン 8.0.1 より前では db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、どのテーブルを読み取るかを決定します。
port
MySQL データベースサービスのポート番号。
いいえ
INTEGER
3306
なし。
ソーステーブル固有
パラメーター
説明
必須
データの型
デフォルト値
備考
server-id
データベースクライアントの数値 ID。
いいえ
STRING
5400 から 6400 の間のランダムな値が生成されます。
この ID は MySQL クラスター内でグローバルに一意でなければなりません。同じデータベースに接続する各ジョブに異なる ID を設定することを推奨します。
このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、複数の同時リーダーがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。
scan.incremental.snapshot.enabled
増分スナップショットを有効にするかどうかを指定します。
いいえ
BOOLEAN
true
増分スナップショットはデフォルトで有効になっています。これは、完全なデータスナップショットを読み取るための新しいメカニズムです。古いスナップショット読み取り方法と比較して、増分スナップショットにはいくつかの利点があります:
ソースは完全なデータを並行して読み取ることができます。
ソースは完全なデータを読み取る際にチャンクレベルのチェックポイントをサポートします。
ソースは完全なデータを読み取る際にグローバル読み取りロック (FLUSH TABLES WITH read lock) を取得する必要がありません。
ソースが同時読み取りをサポートするようにしたい場合、各同時リーダーには一意のサーバー ID が必要です。したがって、server-id は 5400-6400 のような範囲でなければならず、範囲のサイズは並列度以上でなければなりません。
説明この設定項目は Flink コンピュートエンジン VVR 11.1 以降のバージョンでは削除されています。
scan.incremental.snapshot.chunk.size
各チャンクのサイズ (行数)。
いいえ
INTEGER
8096
増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータは、完全に読み取られるまでメモリにキャッシュされます。
チャンクあたりの行数が少ないと、テーブル内のチャンクの総数が多くなります。これにより、障害回復の粒度は向上しますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて合理的なチャンクサイズを設定する必要があります。
scan.snapshot.fetch.size
テーブルの完全なデータを読み取る際に一度にフェッチする最大レコード数。
いいえ
INTEGER
1024
なし。
scan.startup.mode
データ消費の起動モード。
いいえ
STRING
initial
有効な値:
initial (デフォルト):最初の起動時に、完全な既存データをスキャンし、その後最新の Binlog データを読み取ります。
latest-offset:最初の起動時に、既存データをスキャンせず、Binlog の末尾 (最新の位置) から読み取りを開始します。コネクタの起動後に行われた変更のみを読み取ります。
earliest-offset:既存データをスキャンせず、利用可能な最も古い Binlog から読み取りを開始します。
specific-offset:既存データをスキャンせず、指定された Binlog オフセットから開始します。scan.startup.specific-offset.file と scan.startup.specific-offset.pos の両方を構成してオフセットを指定するか、scan.startup.specific-offset.gtid-set のみを構成して GTID セットを指定できます。
timestamp:既存データをスキャンせず、指定されたタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。
重要earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、スキーマの不一致によるエラーを避けるために、指定された Binlog 消費位置とジョブの起動時間の間でテーブルスキーマが変更されないことを確認してください。
scan.startup.specific-offset.file
特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。
いいえ
STRING
なし
この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。ファイル名の例:
mysql-bin.000003。scan.startup.specific-offset.pos
特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。
いいえ
INTEGER
なし
この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。
scan.startup.specific-offset.gtid-set
特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。
いいえ
STRING
なし
この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。GTID セットの例:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。
いいえ
LONG
なし
この構成を使用する場合、scan.startup.mode は timestamp に設定する必要があります。タイムスタンプはミリ秒単位です。
重要特定の時間を使用する場合、MySQL CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリーンアップされておらず、読み取り可能であることを確認してください。
server-time-zone
データベースが使用するセッションタイムゾーン。
いいえ
STRING
このパラメーターを指定しない場合、システムは Flink ジョブランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。
例:Asia/Shanghai。このパラメーターは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。
debezium.min.row.count.to.stream.results
テーブルの行数がこの値を超えると、バッチ読み取りモードが使用されます。
いいえ
INTEGER
1000
Flink は、MySQL ソーステーブルから次のいずれかの方法でデータを読み取ります:
完全読み取り:テーブルデータ全体を直接メモリに読み取ります。これは高速ですが、対応する量のメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーのリスクがあります。
バッチ読み取り:データを複数のバッチで読み取り、すべてのデータが読み取られるまで毎回一定数の行をフェッチします。これにより、大きなテーブルでの OOM リスクを回避できますが、比較的低速です。
connect.timeout
MySQL データベースサーバーへの接続がタイムアウトするまで待機する最大時間 (再試行前)。
いいえ
DURATION
30s
なし。
connect.max-retries
MySQL データベースサービスへの接続に失敗した後の最大再試行回数。
いいえ
INTEGER
3
なし。
connection.pool.size
データベース接続プールのサイズ。
いいえ
INTEGER
20
データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。
jdbc.properties.*
JDBC URL のカスタム接続パラメーター。
いいえ
STRING
なし
カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と構成できます。
サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。
debezium.*
Binlog を読み取るための Debezium のカスタムパラメーター。
いいえ
STRING
なし
カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。
heartbeat.interval
ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。
いいえ
DURATION
30s
ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。期限切れの Binlog オフセットはジョブの失敗を引き起こし、ステートレスな再起動が必要になります。
scan.incremental.snapshot.chunk.key-column
スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。
備考を参照
STRING
なし
プライマリキーのないテーブルには必須です。選択された列は非ヌル型 (NOT NULL) である必要があります。
プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。
rds.region-id
Alibaba Cloud RDS for MySQL インスタンスのリージョン ID。
OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。
STRING
なし
リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。
rds.access-key-id
Alibaba Cloud RDS for MySQL アカウントの AccessKey ID。
OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。
STRING
なし
詳細については、「AccessKey 情報を表示する方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey ID を指定することを推奨します。詳細については、「変数管理」をご参照ください。
rds.access-key-secret
Alibaba Cloud RDS for MySQL アカウントの AccessKey Secret。
OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。
STRING
なし
詳細については、「AccessKey 情報を表示する方法」をご参照ください。
重要AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey Secret を指定することを推奨します。詳細については、「変数管理」をご参照ください。
rds.db-instance-id
Alibaba Cloud RDS for MySQL インスタンスのインスタンス ID。
OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。
STRING
なし
なし。
rds.main-db-id
Alibaba Cloud RDS for MySQL インスタンスのプライマリデータベース ID。
いいえ
STRING
なし
プライマリデータベース ID の取得方法の詳細については、「RDS for MySQL のログバックアップ」をご参照ください。
Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。
rds.download.timeout
OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。
いいえ
DURATION
60s
なし。
rds.endpoint
OSS Binlog 情報を取得するためのエンドポイント。
いいえ
STRING
なし
有効な値の詳細については、「エンドポイント」をご参照ください。
Flink コンピュートエンジン VVR 8.0.8 以降のバージョンでのみサポートされています。
scan.incremental.close-idle-reader.enabled
スナップショットフェーズが終了した後にアイドル状態のリーダーを閉じるかどうかを指定します。
いいえ
BOOLEAN
false
Flink コンピュートエンジン VVR 8.0.1 以降のバージョンでのみサポートされています。
この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。
scan.read-changelog-as-append-only.enabled
変更ログデータストリームを追加専用データストリームに変換するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: すべてのタイプのメッセージ (INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含む) が INSERT メッセージに変換されます。上流テーブルからの削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にします。
false (デフォルト):すべてのメッセージタイプがそのまま下流に送信されます。
説明Flink コンピュートエンジン VVR 8.0.8 以降のバージョンでのみサポートされています。
scan.only.deserialize.captured.tables.changelog.enabled
増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。
いいえ
BOOLEAN
VVR 8.x バージョンではデフォルト値は false です。
VVR 11.1 以降のバージョンではデフォルト値は true です。
有効な値:
true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。
false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。
説明Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。
Flink コンピュートエンジン VVR 8.0.8 以前で使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更する必要があります。
scan.parse.online.schema.changes.enabled
増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:RDS のロックレス変更 DDL イベントを解析します。
false (デフォルト):RDS のロックレス変更 DDL イベントを解析しません。
これは実験的な機能です。オンラインでのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。
説明Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。
scan.incremental.snapshot.backfill.skip
スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: スナップショット読み取りフェーズ中にバックフィルをスキップします。
false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。
バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。
重要バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの不整合につながる可能性があります。少なくとも 1 回のセマンティクスのみが保証されます。
説明Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。
scan.incremental.snapshot.unbounded-chunk-first.enabled
スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。
いいえ
BOOELEAN
false
有効な値:
true:スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布します。
false (デフォルト):スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布しません。
これは実験的な機能です。有効にすると、スナップショットフェーズ中に最後のチャンクを同期する際の TaskManager の OOM エラーのリスクを軽減できます。ジョブの最初の起動前に追加することを推奨します。
説明Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。
ディメンションテーブル固有
パラメーター
説明
必須
データの型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
なし
URL のフォーマットは次のとおりです:
jdbc:mysql://<endpoint>:<port>/<database_name>。lookup.max-retries
データ読み取りに失敗した後の最大再試行回数。
いいえ
INTEGER
3
Flink コンピュートエンジン VVR 6.0.7 以降のバージョンでのみサポートされています。
lookup.cache.strategy
キャッシュポリシー。
いいえ
STRING
なし
None、LRU、ALL の 3 つのキャッシュポリシーをサポートします。値の詳細については、「背景情報」をご参照ください。
説明最近最も使用されていない (LRU) キャッシュポリシーを使用する場合、lookup.cache.max-rows パラメーターも構成する必要があります。
lookup.cache.max-rows
キャッシュされる最大行数。
いいえ
INTEGER
100000
LRU キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要があります。
ALL キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要はありません。
lookup.cache.ttl
キャッシュの生存時間 (TTL)。
いいえ
DURATION
10 s
lookup.cache.ttl の設定は lookup.cache.strategy に依存します:
lookup.cache.strategy が None に設定されている場合、lookup.cache.ttl を設定する必要はありません。これはキャッシュが期限切れにならないことを意味します。
lookup.cache.strategy が LRU に設定されている場合、lookup.cache.ttl はキャッシュの有効期限です。デフォルトでは期限切れになりません。
lookup.cache.strategy が ALL に設定されている場合、lookup.cache.ttl はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。
1min や 10s などの時間形式を使用します。
lookup.max-join-rows
プライマリテーブルのレコードがディメンションテーブルのレコードと一致した場合に返される結果の最大数。
いいえ
INTEGER
1024
なし。
lookup.filter-push-down.enabled
ディメンションテーブルのフィルタープッシュダウンを有効にするかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:ディメンションテーブルのフィルタープッシュダウンを有効にします。MySQL データベーステーブルからデータをロードする際、ディメンションテーブルは SQL ジョブで設定された条件に基づいてデータを事前にフィルタリングします。
false (デフォルト):ディメンションテーブルのフィルタープッシュダウンを無効にします。ディメンションテーブルは MySQL データベーステーブルからロードする際にすべてのデータをロードします。
説明Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。
重要ディメンションテーブルのプッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効にすべきです。MySQL ソーステーブルはフィルタープッシュダウンの有効化をサポートしていません。Flink テーブルがソーステーブルとディメンションテーブルの両方として使用され、ディメンションテーブルでフィルタープッシュダウンが有効になっている場合、ソーステーブルとして使用する際には SQL ヒントを使用してこの構成項目を明示的に false に設定する必要があります。そうしないと、ジョブが異常に実行される可能性があります。
結果テーブル固有
パラメーター
説明
必須
データの型
デフォルト値
備考
url
MySQL JDBC URL。
いいえ
STRING
なし
URL のフォーマットは次のとおりです:
jdbc:mysql://<endpoint>:<port>/<database_name>。sink.max-retries
データ書き込みに失敗した後の最大再試行回数。
いいえ
INTEGER
3
なし。
sink.buffer-flush.batch-size
1 回のバッチで書き込むレコード数。
いいえ
INTEGER
4096
なし。
sink.buffer-flush.max-rows
メモリにキャッシュするデータレコードの数。
いいえ
INTEGER
10000
このパラメーターは、プライマリキーが指定された後にのみ有効になります。
sink.buffer-flush.interval
キャッシュをフラッシュする時間間隔。指定された待機時間後にキャッシュされたデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。
いいえ
DURATION
1s
なし。
sink.ignore-delete
データの削除操作を無視するかどうかを指定します。
いいえ
BOOLEAN
false
Flink SQL によって生成されたストリームに削除または更新前レコードが含まれている場合、複数の出力タスクが同じテーブルの異なるフィールドを同時に更新すると、データの不整合が発生する可能性があります。
たとえば、レコードが削除された後、別のタスクが一部のフィールドのみを更新します。更新されなかったフィールドは null またはデフォルト値になり、データエラーが発生します。
sink.ignore-delete を true に設定することで、上流の DELETE および UPDATE_BEFORE 操作を無視して、このような問題を回避できます。
説明UPDATE_BEFORE は Flink のリトラクションメカニズムの一部であり、更新操作で古い値を「リトラクト」するために使用されます。
ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE タイプのレコードはスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。
sink.ignore-null-when-update
データを更新する際、入力データフィールドの値が null の場合に、対応するフィールドを null に更新するか、そのフィールドの更新をスキップするかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:フィールドを更新しません。このパラメーターは、Flink テーブルにプライマリキーが設定されている場合にのみ true に設定できます。true に設定した場合:
VVR 8.0.6 以前の場合、結果テーブルへのデータ書き込みはバッチ実行をサポートしません。
VVR 8.0.7 以降の場合、結果テーブルへのデータ書き込みはバッチ実行をサポートします。
バッチ書き込みは書き込み効率と全体のスループットを大幅に向上させることができますが、データ遅延や OOM エラーのリスクを引き起こす可能性があります。したがって、実際のビジネスシナリオに基づいてトレードオフを行ってください。
false: フィールドを null に更新します。
説明このパラメーターは、リアルタイムコンピューティングエンジン VVR 8.0.5 以降のバージョンでのみサポートされています。
データ型マッピング
CDC ソーステーブル
MySQL CDC フィールドタイプ
Flink フィールドタイプ
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT UNSIGNED
DECIMAL(20, 0)
BIGINT UNSIGNED ZEROFILL
SERIAL
FLOAT [UNSIGNED] [ZEROFILL]
FLOAT
DOUBLE [UNSIGNED] [ZEROFILL]
DOUBLE
DOUBLE PRECISION [UNSIGNED] [ZEROFILL]
REAL [UNSIGNED] [ZEROFILL]
NUMERIC(p, s) [UNSIGNED] [ZEROFILL]
DECIMAL(p, s)
DECIMAL(p, s) [UNSIGNED] [ZEROFILL]
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
TIMESTAMP [(p)]
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
CHAR(n)
STRING
VARCHAR(n)
TEXT
BINARY
BYTES
VARBINARY
BLOB
重要MySQL の TINYINT(1) 型を 0 と 1 以外の値を格納するために使用しないことを推奨します。property-version が 0 に設定されている場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink の BOOLEAN 型にマッピングするため、データの不正確さを引き起こす可能性があります。TINYINT(1) を 0 と 1 以外の値を格納するために使用するには、構成パラメーター catalog.table.treat-tinyint1-as-boolean をご参照ください。
ディメンションテーブルと結果テーブル
MySQL フィールドタイプ
Flink フィールドタイプ
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
FLOAT
FLOAT
DOUBLE
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
説明ここで p <= 38 です。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink は、2,147,483,647 (2^31 - 1) バイト以下の MySQL BLOB 型レコードのみをサポートします。
BLOB
MEDIUMBLOB
LONGBLOB
データインジェスト
データインジェスト YAML ジョブで MySQL コネクタをデータソースとして使用できます。
構文
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: xxxパラメーター
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 備考 |
type | データソースのタイプ。 | はい | STRING | なし | 値は mysql でなければなりません。 |
name | データソース名。 | いいえ | STRING | なし | なし。 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 | はい | STRING | なし | VPC アドレスを入力することを推奨します。 説明 MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ストレージ管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。 |
username | MySQL データベースサービスのユーザー名。 | はい | STRING | なし | なし。 |
password | MySQL データベースサービスのパスワード。 | はい | STRING | なし | なし。 |
tables | 同期する MySQL データテーブル。 | はい | STRING | なし |
説明
|
tables.exclude | 同期から除外するテーブル。 | いいえ | STRING | なし |
説明 ピリオドはデータベース名とテーブル名を区切るために使用されます。ピリオドを任意の文字に一致させるために使用するには、バックスラッシュでエスケープする必要があります。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。 |
port | MySQL データベースサービスのポート番号。 | いいえ | INTEGER | 3306 | なし。 |
schema-change.enabled | スキーマ変更イベントを送信するかどうかを指定します。 | いいえ | BOOLEAN | true | なし。 |
server-id | 同期に使用されるデータベースクライアントの数値 ID または範囲。 | いいえ | STRING | 5400 から 6400 の間のランダムな値が生成されます。 | この ID は MySQL クラスター内でグローバルに一意でなければなりません。同じデータベースに接続する各ジョブに異なる ID を設定することを推奨します。 このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、複数の同時リーダーがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。 |
jdbc.properties.* | JDBC URL のカスタム接続パラメーター。 | いいえ | STRING | なし | カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と構成できます。 サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。 |
debezium.* | Binlog を読み取るための Debezium のカスタムパラメーター。 | いいえ | STRING | なし | カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。 |
scan.incremental.snapshot.chunk.size | 各チャンクのサイズ (行数)。 | いいえ | INTEGER | 8096 | MySQL テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータは、完全に読み取られるまでメモリにキャッシュされます。 チャンクあたりの行数が少ないと、テーブル内のチャンクの総数が多くなります。これにより、障害回復の粒度は向上しますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて合理的なチャンクサイズを設定する必要があります。 |
scan.snapshot.fetch.size | テーブルの完全なデータを読み取る際に一度にフェッチする最大レコード数。 | いいえ | INTEGER | 1024 | なし。 |
scan.startup.mode | データ消費の起動モード。 | いいえ | STRING | initial | 有効な値:
重要 earliest-offset、specific-offset、および timestamp 起動モードの場合、起動時のテーブルスキーマが指定された開始オフセット時のスキーマと異なる場合、ジョブは失敗します。つまり、これら 3 つのモードを使用する場合、指定された Binlog 消費位置とジョブの起動時間の間でテーブルスキーマが変更されないことを確認する必要があります。 |
scan.startup.specific-offset.file | 特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。 | いいえ | STRING | なし | この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。ファイル名の例: |
scan.startup.specific-offset.pos | 特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。 | いいえ | INTEGER | なし | この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。 |
scan.startup.specific-offset.gtid-set | 特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。 | いいえ | STRING | なし | この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。GTID セットの例: |
scan.startup.timestamp-millis | タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。 | いいえ | LONG | なし | この構成を使用する場合、scan.startup.mode は timestamp に設定する必要があります。タイムスタンプはミリ秒単位です。 重要 特定の時間を使用する場合、MySQL CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリーンアップされておらず、読み取り可能であることを確認してください。 |
server-time-zone | データベースが使用するセッションタイムゾーン。 | いいえ | STRING | このパラメーターを指定しない場合、システムは Flink ジョブランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。 | 例:Asia/Shanghai。このパラメーターは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。 |
scan.startup.specific-offset.skip-events | 指定されたオフセットから読み取る際にスキップする Binlog イベントの数。 | いいえ | INTEGER | なし | この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。 |
scan.startup.specific-offset.skip-rows | 指定されたオフセットから読み取る際にスキップする行の変更数。単一の Binlog イベントは複数の行の変更に対応する場合があります。 | いいえ | INTEGER | なし | この構成を使用する場合、scan.startup.mode は specific-offset に設定する必要があります。 |
connect.timeout | MySQL データベースサーバーへの接続がタイムアウトするまで待機する最大時間 (再試行前)。 | いいえ | DURATION | 30s | なし。 |
connect.max-retries | MySQL データベースサービスへの接続に失敗した後の最大再試行回数。 | いいえ | INTEGER | 3 | なし。 |
connection.pool.size | データベース接続プールのサイズ。 | いいえ | INTEGER | 20 | データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。 |
heartbeat.interval | ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。 | いいえ | DURATION | 30s | ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。期限切れの Binlog オフセットはジョブの失敗を引き起こし、ステートレスな再起動が必要になります。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。 | いいえ。 | STRING | なし | プライマリキーから 1 つの列のみを選択できます。 |
rds.region-id | Alibaba Cloud RDS for MySQL インスタンスのリージョン ID。 | OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。 | STRING | なし | リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。 |
rds.access-key-id | Alibaba Cloud RDS for MySQL アカウントの AccessKey ID。 | OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。 | STRING | なし | 詳細については、「AccessKey 情報を表示する方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey ID を指定することを推奨します。詳細については、「変数管理」をご参照ください。 |
rds.access-key-secret | Alibaba Cloud RDS for MySQL アカウントの AccessKey Secret。 | OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。 | STRING | なし | 詳細については、「AccessKey 情報を表示する方法」をご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey Secret を指定することを推奨します。詳細については、「変数管理」をご参照ください。 |
rds.db-instance-id | Alibaba Cloud RDS for MySQL インスタンスのインスタンス ID。 | OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。 | STRING | なし | なし。 |
rds.main-db-id | Alibaba Cloud RDS for MySQL インスタンスのプライマリデータベース ID。 | いいえ | STRING | なし | プライマリデータベース ID の取得方法の詳細については、「RDS for MySQL のログバックアップ」をご参照ください。 |
rds.download.timeout | OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。 | いいえ | DURATION | 60s | なし。 |
rds.endpoint | OSS Binlog 情報を取得するためのエンドポイント。 | いいえ | STRING | なし | 有効な値の詳細については、「エンドポイント」をご参照ください。 |
rds.binlog-directory-prefix | Binlog ファイルを格納するディレクトリのプレフィックス。 | いいえ | STRING | rds-binlog- | なし。 |
rds.use-intranet-link | Binlog ファイルのダウンロードに内部ネットワークリンクを使用するかどうかを指定します。 | いいえ | BOOLEAN | true | なし。 |
rds.binlog-directories-parent-path | Binlog ファイルが格納されている親ディレクトリの絶対パス。 | いいえ | STRING | なし | なし。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | いいえ | INTEGER | 1000 | メタデータがこの値より大きい場合、送信のために複数の部分に分割されます。 |
chunk-key.even-distribution.factor.lower-bound | 均等シャーディングのためのチャンク分布係数の下限。 | いいえ | DOUBLE | 0.05 | 分布係数がこの値より小さい場合、不均等シャーディングが使用されます。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。 |
chunk-key.even-distribution.factor.upper-bound | 均等シャーディングのためのチャンク分布係数の上限。 | いいえ | DOUBLE | 1000.0 | 分布係数がこの値より大きい場合、不均等シャーディングが使用されます。 チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。 |
scan.incremental.close-idle-reader.enabled | スナップショットフェーズが終了した後にアイドル状態のリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | この構成を有効にするには、 |
scan.only.deserialize.captured.tables.changelog.enabled | 増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 | いいえ | BOOLEAN |
| 有効な値:
|
scan.parallel-deserialize-changelog.enabled | 増分フェーズで、複数のスレッドを使用して変更イベントを解析するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
説明 Flink コンピュートエンジン VVR 8.0.11 以降のバージョンでのみサポートされています。 |
scan.parallel-deserialize-changelog.handler.size | 複数のスレッドを使用して変更イベントを解析する場合のイベントハンドラの数。 | いいえ | INTEGER | 2 | 説明 Flink コンピュートエンジン VVR 8.0.11 以降のバージョンでのみサポートされています。 |
メタデータ-カラム.インクルード-リスト | 下流のシンクに渡すメタデータ列。 | いいえ | 文字列 | なし | 利用可能なメタデータには、 説明 MySQL CDC YAML コネクタは、 重要
|
scan.newly-added-table.enabled | チェックポイントから再起動する際に、前回の起動時に一致しなかった新しく追加されたテーブルを同期するか、または現在一致しないテーブルを状態から削除するかを指定します。 | いいえ | BOOLEAN | false | チェックポイントまたはセーブポイントから再起動する際に有効になります。 |
scan.binlog.newly-added-table.enabled | 増分フェーズで、一致する新しく追加されたテーブルからデータを送信するかどうかを指定します。 | いいえ | BOOLEAN | false |
|
scan.incremental.snapshot.chunk.key-column | 特定のテーブルに対して、スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。 | いいえ | STRING | なし |
|
scan.parse.online.schema.changes.enabled | 増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
これは実験的な機能です。オンラインでのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。 説明 Flink コンピュートエンジン VVR 11.0 以降のバージョンでのみサポートされています。 |
scan.incremental.snapshot.backfill.skip | スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。 重要 バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの不整合につながる可能性があります。少なくとも 1 回のセマンティクスのみが保証されます。 説明 Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。 |
treat-tinyint1-as-boolean.enabled | TINYINT(1) 型をブール型として扱うかどうかを指定します。 | いいえ | ブール値 | true | 有効な値:
|
treat-timestamp-as-datetime-enabled | TIMESTAMP 型を DATETIME 型として扱うかどうかを指定します。 | いいえ | BOOLEAN | false | 有効な値:
MySQL の TIMESTAMP 型は UTC 時刻を格納し、タイムゾーンの影響を受けます。MySQL の DATETIME 型はリテラル時刻を格納し、タイムゾーンの影響を受けません。 有効にすると、server-time-zone に基づいて MySQL の TIMESTAMP 型データを DATETIME 型に変換します。 |
include-comments.enabled | テーブルとフィールドのコメントを同期するかどうかを指定します。 | いいえ | BOOELEAN | false | 有効な値:
これを有効にすると、ジョブのメモリ使用量が増加します。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。 | いいえ | BOOELEAN | false | このパラメーターは次の値をサポートします:
これは実験的な機能です。この機能を有効にすると、TaskManager がスナップショットフェーズ中に最後のシャードを同期する際のメモリ不足 (OOM) エラーのリスクが軽減されます。ジョブが初めて開始される前に、このパラメーターを追加してください。 説明 Flink コンピュートエンジン Ververica Runtime (VVR) 11.1 以降でのみサポートされています。 |
型マッピング
次の表は、データインジェストのデータ型マッピングを示しています。
MySQL CDC フィールドタイプ | CDC フィールドタイプ |
TINYINT(n) | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
YEAR | |
INT | INT |
MEDIUMINT | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38 | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38 | |
BOOLEAN | BOOLEAN |
BIT(1) | |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] |
DATETIME [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] |
|
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65 | STRING 説明 MySQL では、decimal データ型の精度は最大 65 ですが、Flink では、decimal データ型の精度は 38 に制限されています。したがって、精度が 38 を超える decimal 列を定義する場合は、精度の損失を避けるために文字列にマッピングする必要があります。 |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65 | |
FIXED(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65 | |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
ENUM | |
JSON | STRING 説明 JSON データ型は Flink で JSON 形式の文字列に変換されます。 |
GEOMETRY | STRING 説明 MySQL の空間データ型は、固定の JSON 形式の文字列に変換されます。詳細については、MySQL の空間データ型マッピングをご参照ください。 |
POINT | |
LINESTRING | |
POLYGON | |
MULTIPOINT | |
MULTILINESTRING | |
MULTIPOLYGON | |
GEOMETRYCOLLECTION | |
TINYBLOB | BYTES 説明 MySQL の BLOB データ型については、長さが 2,147,483,647 (2**31-1) 以下の BLOB のみがサポートされています。 |
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
使用例
CDC ソーステーブル
CREATE TEMPORARY TABLE mysqlcdc_source ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( order_id INT, customer_name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT order_id, customer_name FROM mysqlcdc_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;シンクテーブル
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mysql_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO mysql_sink SELECT * FROM datagen_source;データインジェストのデータソース
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
MySQL CDC ソーステーブルについて
実装原理
MySQL CDC ソーステーブルが起動すると、テーブル全体をスキャンし、プライマリキーに基づいて複数のチャンクに分割し、現在のバイナリログオフセットを記録します。次に、増分スナップショットアルゴリズムを使用して、SELECT 文で各チャンクからデータを読み取ります。ジョブは定期的にチェックポイントを実行して、完了したチャンクを記録します。フェールオーバーが発生した場合、ジョブは未完了のチャンクのみを読み取る必要があります。すべてのチャンクが読み取られた後、ジョブは以前に記録されたバイナリログオフセットから増分変更レコードの読み取りを開始します。Flink ジョブは定期的なチェックポイントを実行し続け、バイナリログオフセットを記録します。ジョブがフェールオーバーした場合、最後に記録されたバイナリログオフセットから処理を再開します。このプロセスにより、Exactly-Once セマンティクスが実現されます。
増分スナップショットアルゴリズムの詳細な説明については、「MySQL CDC コネクタ」をご参照ください。
メタデータ
メタデータは、シャーディングされたデータベースとテーブルをマージして同期するシナリオで役立ちます。これは、マージ後、ビジネスでは各データのソースデータベースとテーブルを区別したいことが多いためです。メタデータ列を使用して、ソーステーブルのデータベースとテーブル名情報にアクセスできます。したがって、メタデータ列を使用して、複数のシャーディングされたテーブルを単一の宛先テーブルに簡単にマージできます。
MySQL CDC ソースはメタデータ列の構文をサポートしています。メタデータ列を使用して、次のメタデータにアクセスできます。
メタデータキー
メタデータ型
説明
database_name
STRING NOT NULL
行を含むデータベースの名前。
table_name
STRING NOT NULL
行を含むテーブルの名前。
op_ts
TIMESTAMP_LTZ(3) NOT NULL
データベースで変更が行われた時刻。レコードがテーブルの既存データからのものであり、Binlog からのものでない場合、この値は常に 0 です。
op_type
STRING NOT NULL
行の変更タイプ。
+I:INSERT メッセージ
-D:DELETE メッセージ
-U:UPDATE_BEFORE メッセージ
+U:UPDATE_AFTER メッセージ
説明Realtime Compute for Apache Flink VVR 8.0.7 以降のバージョンでのみサポートされています。
query_log
STRING NOT NULL
読み取られた行に対応する MySQL クエリログレコード。
説明MySQL は、クエリログを記録するために binlog_rows_query_log_events パラメーターを有効にする必要があります。
次のコード例は、MySQL インスタンス内の複数のシャーディングされたデータベースから複数の orders テーブルを、下流の Hologres インスタンスの holo_orders テーブルにマージして同期する方法を示しています。
CREATE TEMPORARY TABLE mysql_orders ( db_name STRING METADATA FROM 'database_name' VIRTUAL, -- データベース名を読み取ります。 table_name STRING METADATA FROM 'table_name' VIRTUAL, -- テーブル名を読み取ります。 operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更タイムスタンプを読み取ります。 op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取ります。 order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb_.*', -- 複数のシャーディングされたデータベースに一致する正規表現。 'table-name' = 'orders_.*' -- 複数のシャーディングされたテーブルに一致する正規表現。 ); INSERT INTO holo_orders SELECT * FROM mysql_orders;上記のコードに基づき、WITH 句で scan.read-changelog-as-append-only.enabled パラメーターが true に設定されている場合、出力結果は下流テーブルのプライマリキー設定によって異なります:
下流テーブルのプライマリキーが order_id の場合、出力結果には上流テーブルの各プライマリキーの最後の変更のみが含まれます。たとえば、最後の変更が削除操作であったプライマリキーの場合、下流テーブルには同じプライマリキーと op_type が -D のレコードが表示されます。
下流テーブルのプライマリキーが order_id、operation_ts、および op_type の複合である場合、出力結果には上流テーブルの各プライマリキーの完全な変更履歴が含まれます。
正規表現のサポート
MySQL CDC ソーステーブルは、テーブル名またはデータベース名に正規表現を使用して、複数のテーブルまたはデータベースに一致させることをサポートしています。次のコード例は、正規表現を使用して複数のテーブルを指定する方法を示しています。
CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 複数のデータベースに一致する正規表現。 'table-name' = '(t[5-8]|tt)' -- 複数のテーブルに一致する正規表現。 );次の表は、上記の例の正規表現を説明しています:
^(test).* はプレフィックスマッチングの例です。この式は、「test1」や「test2」など、「test」で始まるデータベース名に一致します。
.*[p$] はサフィックスマッチングの例です。この式は、「cdcp」や「edcp」など、「p」で終わるデータベース名に一致します。
txc は特定のマッチです。これは、正確に「txc」であるデータベース名に一致します。
完全修飾テーブル名を照合する場合、MySQL CDC はデータベース名とテーブル名を使用してテーブルを一意に識別します。パターンは database-name.table-name です。たとえば、照合パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) は、txc.tt や test2.test5 などのデータベース内のテーブルに一致します。
重要SQL ジョブの構成では、table-name と database-name はカンマ (,) を使用して複数のテーブルまたはデータベースを指定することをサポートしていません。
複数のテーブルを照合したり、複数の正規表現を使用したりするには、それらを縦棒 (|) で接続し、括弧で囲むことができます。たとえば、テーブル「user」と「product」を読み取るには、table-name を
(user|product)として構成できます。正規表現にカンマが含まれている場合は、縦棒 (|) 演算子を使用して書き直す必要があります。たとえば、正規表現
mytable_\d{1, 2}は、カンマを使用しないように、同等の(mytable_\d{1}|mytable_\d{2})に書き直す必要があります。
同時実行制御
MySQL コネクタは、複数の同時実行スレッドで完全なデータを読み取ることをサポートしており、データ読み込み効率を向上させることができます。Realtime Compute for Apache Flink コンソールの Autopilot 自動チューニング機能と組み合わせることで、コネクタはマルチスレッド読み取りが完了した後、増分フェーズ中に自動的にスケールインして計算リソースを節約できます。
Realtime Compute for Apache Flink 開発コンソールでは、リソース構成ページで基本モードまたはエキスパートモードでジョブの同時実行数を設定できます。違いは次のとおりです:
基本モードで設定された同時実行数は、ジョブ全体のグローバルな同時実行数です。

エキスパートモードは、必要に応じて特定の VERTEX の同時実行数を設定することをサポートしています。

リソース構成の詳細については、「ジョブのデプロイメント情報の構成」をご参照ください。
重要基本モードまたはエキスパートモードのいずれを使用する場合でも、同時実行数を設定する際には、テーブルで宣言されている server-id の範囲がジョブの同時実行数以上である必要があります。たとえば、server-id の範囲が 5404-5412 の場合、8 つの一意の server-id があります。この場合、ジョブは最大 8 つの同時実行スレッドを持つことができます。同じ MySQL インスタンスに対する異なるジョブは、server-id の範囲が重複してはなりません。つまり、各ジョブには明示的に異なる server-id を構成する必要があります。
Autopilot 自動スケールイン
完全データフェーズでは、大量の既存データが蓄積されます。読み取り効率を向上させるために、既存データは通常、同時に読み取られます。増分バイナリログフェーズでは、バイナリログデータの量が少なく、グローバルな順序を保証する必要があるため、通常は単一の同時読み取りで十分です。完全フェーズと増分フェーズの異なるリソース要件は、自動チューニング機能によって自動的にバランスが取られ、高性能とリソース効率の両方を実現できます。
自動チューニングは、MySQL CDC ソースの各タスクのトラフィックを監視します。ジョブがバイナリログフェーズに入ると、1 つのタスクのみがバイナリログの読み取りを担当し、他のタスクがアイドル状態の場合、自動チューニングは自動的にソースの CU 数と同時実行数を削減します。自動チューニングを有効にするには、ジョブの O&M ページで自動チューニングモードをアクティブに設定します。
説明並列処理の次数を減らすためのデフォルトの最小トリガー間隔は 24 時間です。自動チューニングのパラメーターと詳細については、「自動チューニングの構成」をご参照ください。
起動モード
scan.startup.mode 構成オプションを使用して、MySQL CDC ソーステーブルの起動モードを指定できます。利用可能なオプションは次のとおりです:
initial (デフォルト):最初の起動時に、コネクタはデータベーステーブルの完全な読み取りを実行し、その後、増分モードに切り替えてバイナリログを読み取ります。
earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古いバイナリログオフセットから読み取りを開始します。
latest-offset:スナップショットフェーズをスキップし、バイナリログの末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの開始後に発生したデータ変更のみを読み取ることができます。
specific-offset:スナップショットフェーズをスキップし、指定されたバイナリログオフセットから読み取りを開始します。オフセットは、バイナリログのファイル名と位置、または GTID セットで指定できます。
timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプからバイナリログイベントの読み取りを開始します。
使用例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始します。 'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始します。 'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始します。 'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始します。 'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードで Binlog ファイル名を指定します。 'scan.startup.specific-offset.pos' = '4', -- specific-offset モードで Binlog 位置を指定します。 'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードで GTID セットを指定します。 'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードで起動タイムスタンプを指定します。 ... )重要MySQL ソースは、チェックポイント中に現在のオフセットを INFO レベルでログに出力します。ログのプレフィックスは
Binlog offset on checkpoint {checkpoint-id}です。このログは、特定のチェックポイントオフセットからジョブを開始するのに役立ちます。読み取られるテーブルがスキーマ変更を経た場合、最も古いオフセット (earliest-offset)、特定のオフセット (specific-offset)、またはタイムスタンプ (timestamp) から開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のテーブルスキーマを保存しており、スキーマが一致しない古いデータは正しく解析できないためです。
プライマリキーのない CDC ソーステーブルについて
プライマリキーのないテーブルを使用する場合、scan.incremental.snapshot.chunk.key-column を設定する必要があります。非ヌルフィールドのみを選択できます。
プライマリキーのない CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定された列の動作によって決まります:
指定された列が更新されない場合、Exactly-Once セマンティクスが保証されます。
指定された列が更新される場合、At-Least-Once セマンティクスのみが保証されます。ただし、プライマリキーを持ち、べき等操作を使用する下流のシンクと列を組み合わせることで、データの正確性を保証できます。
Alibaba Cloud RDS for MySQL バックアップログの読み取り
MySQL CDC ソーステーブルは、Alibaba Cloud RDS for MySQL からのバックアップログの読み取りをサポートしています。これは、完全データフェーズに時間がかかり、ローカルのバイナリログファイルが自動的にクリーンアップされたが、自動または手動でアップロードされたバックアップファイルがまだ存在する場合に役立ちます。
使用例:
CREATE TABLE mysql_source (...) WITH ( 'connector' = 'mysql-cdc', 'rds.region-id' = 'cn-beijing', 'rds.access-key-id' = 'xxxxxxxxx', 'rds.access-key-secret' = 'xxxxxxxxx', 'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 'rds.main-db-id' = '12345678', 'rds.download.timeout' = '60s' ... )CDC ソースの再利用を有効にする
同じインスタンスから複数の MySQL CDC ソーステーブルを使用するジョブは、複数の Binlog クライアントを起動し、データベースの負荷を増加させます。詳細については、「MySQL CDC よくある質問」をご参照ください。
ソリューション
Realtime Compute for Apache Flink VVR 8.0.7 以降のバージョンは、MySQL CDC ソースの再利用をサポートしています。再利用は、マージ可能な MySQL CDC ソーステーブルをマージします。マージは、ソーステーブルがデータベース名、テーブル名、および
server-idを除く同じ構成項目を持つ場合に発生します。エンジンは、同じジョブ内の MySQL CDC ソースを自動的にマージします。手順
SQL ジョブで
SETコマンドを使用します:SET 'table.optimizer.source-merge.enabled' = 'true'; # (VVR 8.0.8 および 8.0.9 の場合) さらにこの項目を設定します: SET 'sql-gateway.exec-plan.enabled' = 'false';VVR 11.1 以降のバージョンでは、再利用はデフォルトで有効になっています。
ジョブをステートレスで開始します。ソースの再利用構成を変更すると、ジョブのトポロジーが変更されます。ジョブをステートレスで開始する必要があります。そうしないと、ジョブの開始に失敗したり、データが失われたりする可能性があります。ソースがマージされた場合、
MergetableSourceScanノードが表示されます。
重要再利用を有効にした後、演算子チェーンを無効にすることは推奨されません。
pipeline.operator-chainingをfalseに設定すると、データのシリアル化と逆シリアル化のオーバーヘッドが増加します。マージされるソースが多いほど、オーバーヘッドは大きくなります。VVR 8.0.7 では、演算子チェーンを無効にするとシリアル化の問題が発生します。
Binlog 読み取りの高速化
MySQL コネクタをソーステーブルまたはデータインジェストのデータソースとして使用する場合、増分フェーズ中にバイナリログファイルを解析してさまざまな変更メッセージを生成します。バイナリログファイルは、すべてのテーブルの変更をバイナリ形式で記録します。次の方法でバイナリログファイルの解析を高速化できます。
解析フィルター構成の有効化
構成項目
scan.only.deserialize.captured.tables.changelog.enabledを使用します:指定されたテーブルの変更イベントのみを解析します。
Debezium パラメーターの最適化
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size: ブロッキングキューが保持できるレコードの最大数です。Debezium がデータベースからイベントストリームを読み取ると、イベントをダウンストリームに書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。debezium.max.batch.size: 各反復でコネクタが処理するイベントの最大数です。デフォルト値は 2048 です。debezium.poll.interval.ms: コネクタが新しい変更イベントをリクエストする前に待機するミリ秒数です。デフォルト値は 1000 ミリ秒、つまり 1 秒です。
使用例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium 構成
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 解析フィルターを有効にする
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 指定されたテーブルの変更イベントのみを解析します。
...
)source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
# Debezium 構成
debezium.max.queue.size: 162580
debezium.max.batch.size: 40960
debezium.poll.interval.ms: 50
# 解析フィルターを有効にする
scan.only.deserialize.captured.tables.changelog.enabled: trueMySQL CDC Enterprise Edition のバイナリログ消費能力は 85 MB/s で、オープンソースコミュニティバージョンの約 2 倍です。バイナリログファイルの生成速度が 85 MB/s を超える場合 (つまり、512 MB のファイルが 6 秒ごとに 1 つ生成される場合)、Flink ジョブの遅延は上昇し続けます。バイナリログファイルの生成速度が遅くなると、処理遅延は徐々に減少します。バイナリログファイルに大きなトランザクションが含まれている場合、処理遅延が一時的に増加する可能性があります。トランザクションログが読み取られた後、遅延は減少します。
MySQL CDC DataStream API
DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。
DataStream API プログラムを作成し、MySqlSource を使用できます。以下にコードと pom 依存関係の例を示します:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // キャプチャするデータベースを設定
.tableList("yourDatabaseName.yourTableName") // キャプチャするテーブルを設定
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // SourceRecord を JSON 文字列に変換
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// チェックポイントを有効にする
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 4 つの並列ソースタスクを設定
.setParallelism(4)
.print().setParallelism(1); // メッセージの順序を保つためにシンクに並列度 1 を使用
env.execute("Print MySQL Snapshot + Binlog");
}
}<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>MySqlSource をビルドする際には、コードで次のパラメーターを指定する必要があります:
パラメーター | 説明 |
hostname | MySQL データベースの IP アドレスまたはホスト名。 |
port | MySQL データベースサービスのポート番号。 |
databaseList | MySQL データベース名。 説明 データベース名は正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。 |
username | MySQL データベースサービスのユーザー名。 |
password | MySQL データベースサービスのパスワード。 |
deserializer | SourceRecord 型のレコードを指定された型に逆シリアル化するデシリアライザ。有効な値:
|
pom の依存関係では、次のパラメーターを指定する必要があります:
${vvr.version} | Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョン。例: 説明 定期的に Hotfix バージョンをリリースする可能性があるため、Maven に表示されているバージョン番号を使用してください。これらの更新は他のチャネルを通じて発表されない場合があります。 |
${flink.version} | Apache Flink のバージョン。例: 重要 ジョブの実行時に互換性の問題が発生しないように、Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョンに対応する Apache Flink のバージョンを使用してください。バージョンの対応については、「DPI エンジン」をご参照ください。 |
よくある質問
CDC ソーステーブルを使用する際に発生する可能性のある問題については、「CDC の問題」をご参照ください。