このトピックでは、MySQL コネクタのベストプラクティスについて説明します。
サーバー ID を設定してバイナリログ消費の競合を回避
データベースデータを同期する各クライアントには、一意のサーバー ID があります。複数の MySQL CDC ソーステーブルが同じサーバー ID を共有している場合、サーバー ID の競合エラーが発生するため、クライアントごとに異なるサーバー ID を割り当てることをお勧めします。
サーバー ID の構成
サーバー ID は、Flink テーブルの DDL 文または SQL ヒントで設定できます。
SQL ヒントを使用してサーバー ID を割り当てることをお勧めします。 SQL ヒントの詳細については、「SQL ヒント」をご参照ください。
さまざまなシナリオでのサーバー ID の構成
シナリオ 1:増分スナップショットが無効になっているか、並列度が 1 です。
増分スナップショットフレームが無効であるか、並列度が 1 の場合は、サーバー ID を指定できます。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;シナリオ 2:増分スナップショットが有効になっており、並列度が 1 より大きいです。
サーバー ID の範囲を指定し、範囲内の使用可能なサーバー ID の数が並列度以上であることを確認します。たとえば、並列度が 3 の場合は、次の文を実行してサーバー ID 範囲を設定します。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;CTAS を使用したデータ同期
CTAS を使用してデータを同期し、複数の MySQL CDC ソースが同じ構成を共有している場合、ソーステーブルは自動的にマージされます。この場合、複数の MySQL CDC ソーステーブルに同じサーバー ID を指定できます。詳細については、「CREATE TABLE AS 文」トピックの「例 4:複数の CREATE TABLE AS 文の実行」セクションをご参照ください。
シナリオ 4:ジョブに複数の MySQL CDC ソーステーブルが含まれており、CTAS 文はデータ同期に使用されていません。
ジョブに複数の MySQL CDC ソーステーブルが含まれ、CTAS ステートメントを使用して同期されず、ソースの再利用が無効になっている場合は、各 CDC ソーステーブルに一意のサーバー ID を指定する必要があります (詳細については、「ソースの再利用を有効にしてバイナリログデータ接続を削減」をご参照ください)。同様に、増分スナップショットフレームワークが有効で、並列度が 1 より大きい場合は、サーバー ID の範囲を指定する必要があります。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
メモリ最適化のためにチャンクオプションを構成する
MySQL CDC ソースコネクタが起動されると、コネクタはデータを読み取る必要のあるテーブル全体をスキャンし、プライマリキーに基づいてテーブルを複数のチャンクに分割し、この時点でのバイナリログファイルの位置を記録します。次に、MySQL CDC ソースコネクタは増分スナップショットメカニズムを実装して、各チャンクからデータを読み取ります。 Flink ジョブは定期的にチェックポイントを生成して、データが読み取られたチャンクを記録します。フェールオーバーが発生した場合、MySQL CDC コネクタは、データが読み取られていないチャンクからのデータの読み取りのみを続行する必要があります。すべてのチャンクのデータが読み取られた後、増分変更レコードが以前のバイナリログファイルの位置から読み取られます。 Flink ジョブは、バイナリログファイルの位置を記録するために、定期的にチェックポイントを生成し続けます。フェールオーバーが発生した場合、MySQL CDC コネクタは以前のバイナリログファイルの位置からデータを処理します。このようにして、1 回限りのセマンティクスが実装されます。
増分セーブポイントアルゴリズムの詳細については、Apache Flink ドキュメントのMySQL CDC コネクタをご参照ください。
デフォルトでは、単一列のプライマリキーを持つテーブルは、そのキーに基づいてチャンクに分割されます。複合プライマリキーを持つ物理テーブルは、デフォルトではキーの最初の列によってチャンク化されます。 Ververica Runtime(VVR)6.0.7 以降を使用する Realtime Compute for Apache Flink は、プライマリキーのないテーブルからのデータの読み取りをサポートしています。このようなテーブルのデータは、scan.incremental.snapshot.chunk.key-column で指定された NULL でない列に基づいてチャンクに配布されます。
シャーディングパラメーターの最適化
チャンクデータとメタデータは両方ともメモリに保存されます。メモリ不足(OOM)が発生した場合は、特定の状況に基づいてチャンク関連のコネクタオプションを調整します。
JobManager
チャンク数が多すぎると、すべてのチャンクに関するデータを保存する JobManager の OOM が発生する可能性があります。 JobManager OOM を回避するには、scan.incremental.snapshot.chunk.size を大きい値に設定してチャンク数を減らします。 jobmanager.memory.heap.size を構成して、JobManager の JVM ヒープメモリを増やすこともできます。詳細については、Apache Flink ドキュメントのメモリ構成をご参照ください。
TaskManager
チャンク内の行数が多すぎると、各チャンクからデータを読み取る TaskManager の OOM が発生する可能性があります。 TaskManager OOM エラーを回避するには、scan.incremental.snapshot.chunk.size を小さい値に設定して、各チャンク内の行数を減らします。 taskmanager.memory.framework.heap.size に大きい値を割り当てて、TaskManager の JVM ヒープメモリサイズを増やすこともできます。
VVR 8.0.8 以前を使用する Realtime Compute for Apache Flink では、最後のチャンクのデータサイズは通常大きく、TaskManager OOM エラーが発生する可能性があります。この問題を解決するには、VVR 8.0.9 以降にアップグレードします。
複合キーの最初の列に重複値が多い場合、最初のプライマリキー列に依存するデフォルトのチャンクメカニズムにより、チャンクサイズが大きくなり、TaskManager OOM エラーが発生する可能性があります。これを回避するには、scan.incremental.snapshot.chunk.key-column を構成して、別のプライマリキー列でテーブルを分割します。
ジョブ設定を調整してフルフェーズでの読み取りを高速化
フル読み取りフェーズ中、MySQL ソーステーブルは Java Database Connectivity (JDBC) 接続を介してスナップショットデータを読み取ります。次の方法でフルフェーズでの読み取りを高速化できます。
ソースの並列度を上げて、フルフェーズでの読み取りを高速化します。
scan.incremental.snapshot.chunk.size の値を増やして、単一のチャンクで取得されるデータ量を増やします。
ダウンストリームのシンクテーブルにプライマリキーがあり、べき等書き込みをサポートしている場合は、scan.incremental.snapshot.backfill.skip を有効にして、バックフィル部分のバイナリログの読み取りをスキップできます。これにより、フルフェーズでの処理が高速化されます。
ソースの再利用を有効にしてバイナリログデータ接続を削減
ソースマージは、複数の MySQL CDC ソーステーブルを持つジョブに役立ちます。これにより、Flink は MySQL に必要な最小限の接続数でバイナリログにアクセスできるようになり、MySQL データベースの負荷が軽減されます。この機能は、MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタはこれをサポートしていません。
SQL ジョブで SET コマンドを使用して、ソースの再利用機能を有効にできます。
SET 'table.optimizer.source-merge.enabled' = 'true';新しく作成されたジョブに対して、ソースの再利用機能を有効にすることを推奨します。既存のジョブでこの機能を有効にする場合は、ステートレス起動を実行する必要があります。これは、ソースの再利用によってジョブトポロジーが変更されるためです。以前のステートからジョブを開始すると、起動に失敗したり、データが失われたりする可能性があります。
ソースマージが有効になると、同じ構成の MySQL CDC ソーステーブルがマージされます。すべての MySQL CDC ソーステーブルが同じ構成を共有している場合、対応する Flink ジョブの MySQL 接続数は次のとおりです。
スナップショット読み取り中は、接続数はソースの並列度と同じです。
増分読み取り中は、接続数は 1 です。
VVR 8.0.8 および 8.0.9 では、CDC ソースマージを有効にする場合、
SET 'sql-gateway.exec-plan.enabled' = 'false';も設定する必要があります。CDC ソースマージを有効にした後、
pipeline.operator-chainingジョブ設定項目を false に設定することは推奨されません。オペレーターチェーンが切断されると、ソースからダウンストリームオペレーターに送信されるデータのシリアル化および逆シリアル化のオーバーヘッドが増加します。マージされるソースが多いほど、オーバーヘッドは大きくなります。VVR 8.0.7 リアルタイムコンピューティングエンジンでは、
pipeline.operator-chainingを false に設定すると、シリアル化の問題が発生します。
バイナリログ解析オプションを構成して増分読み取りを高速化する
MySQL CDC コネクタがソーステーブルまたはデータインジェスチョンソースとして使用される場合、MySQL CDC コネクタはバイナリログファイルを解析して変更メッセージを生成します。バイナリログファイルには、すべてのテーブルの変更データが記録されます。バイナリログデータの解析を高速化するには、次の方法を使用できます。
並列解析と解析フィルタを有効にします。この機能は、VVR 8.0.7 以降の MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタではサポートされていません。
scan.only.deserialize.captured.tables.changelog.enabled設定項目を有効にします。この項目は、指定されたテーブルの変更イベントのみを解析します。scan.parallel-deserialize-changelog.enabled設定項目を有効にします。この項目は、複数のスレッドを使用してバイナリログファイルを解析し、解析されたデータを順次消費キューに送信します。この設定を有効にする場合、Task Manager CPU を増やす必要がある場合があります。
Debezium 関連のオプションを最適化する
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:ブロッキングキューが保持できるレコードの最大数。Debezium がデータベースからイベントストリームを読み取ると、イベントをダウンストリームに書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。debezium.max.batch.size:コネクタが各反復で処理できるイベントの最大数。デフォルト値は 2048 です。debezium.poll.interval.ms:コネクタが新しい変更イベントをリクエストする前に待機するミリ秒数。デフォルト値は 1000 ミリ秒 (1 秒) です。
例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium 関連のオプションを構成します。
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 並列解析と解析フィルタ構成を有効にします。
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 指定されたテーブルの変更イベントのみを解析します。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- バイナリログファイルのイベントを解析するために複数のスレッドを使用します。
...
)データ遅延の分析とジョブスループットの最適化
増分フェーズ中にデータ遅延が発生した場合は、次の手順に従って問題を分析できます。
概要 の `currentFetchEventTimeLag` および `currentEmitEventTimeLag` メトリックを参照します。`currentFetchEventTimeLag` メトリックはバイナリログからデータを読み取る際の遅延を表し、`currentEmitEventTimeLag` メトリックはジョブに関連するテーブルのこのプロセスの遅延を表します。
シナリオの説明
詳細
currentFetchEventTimeLag は比較的小さいですが、currentEmitEventTimeLag は比較的大きく、頻繁には更新されません。
currentFetchEventTimeLag の値が小さい場合は、データベースからバイナリログをプルする際のレイテンシーが低いことを示します。ただし、バイナリログにジョブが読み取るテーブルのデータがほとんど含まれていない場合、currentEmitEventTimeLag はほとんど更新されません。これは正常な状態です。
currentFetchEventTimeLag と currentEmitEventTimeLag の両方が大きい状態です。
これは、ソーステーブルのプルパフォーマンスが低いことを示しています。このセクションの手順に従って最適化してください。
ダウンストリーム演算子へのデータのストリーミングを遅くする逆圧力が存在するかどうかを確認します。逆圧力が存在する場合、sourceIdleTime メトリックの値が定期的に増加し、currentFetchEventTimeLag メトリックと currentEmitEventTimeLag メトリックの値が継続的に増加する可能性があります。この問題を解決するには、低速の演算子を特定し、その並列度を上げます。
JVM のTM CPU 使用率メトリックとTM GC 時間メトリックを確認して、CPU またはメモリリソースが使い果たされているかどうかを確認します。リソースの枯渇が発生している場合は、リソースをスケールアップして読み取りパフォーマンスを最適化することを検討してください。 miniBatch オプションを構成して読み取りスループットを向上させることもできます。詳細については、「Flink SQL の最適化」をご参照ください。
状態が大きいジョブに SinkUpsertMaterializer 演算子が存在する場合、読み取りパフォーマンスが低下します。この場合、ジョブの並列度を上げるか、SinkUpsertMaterializer 演算子を使用しないでください。詳細については、「SinkUpsertMaterializer の使用を避ける」をご参照ください。 SinkUpsertMaterializer 演算子を削除した後、ステートレス起動を実行します。これは、ジョブグラフが変更されたため、ステートフル起動が失敗したり、データ損失が発生したりする可能性があるため、必要です。
RDS バイナリログの読み取りを有効にして有効期限切れを防止
ApsaraDB RDS for MySQL をデータソースとして使用できます。これにより、OSS に保存されているログバックアップを読み取ることができます。リクエストされたバイナリログファイル(タイムスタンプまたは位置で指定)が OSS に保存されている場合、Flink は処理する前にバイナリログをクラスターにダウンロードします。リクエストされたバイナリログファイルがローカルで利用可能な場合、Flink はデータベース接続に自動的に切り替えてバイナリログを読み取ります。この機能は、MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタはこれをサポートしていません。
OSS からバイナリログを読み取るには、ApsaraDB RDS for MySQL 接続オプションを次のように構成します。
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', -- ApsaraDB RDS for MySQL インスタンス ID。
'rds.main-db-id' = '12345678', -- プライマリデータベース ID。
'rds.endpoint' = 'rds.aliyuncs.com'
...
)データベースデータとスキーマの変更を同期する
データ同期タスクには、データ統合シナリオ向けに最適化されたデータインジェストデプロイメントを作成することを推奨します。詳細については、「データインジェストのための YAML デプロイメント入門」および「Flink CDC ジョブの開発 (ベータ)」をご参照ください。
次のコードスニペットは、データインジェスチョンデプロイメントが app_db という名前の MySQL データベースから Hologres にデータとスキーマの変更を同期する方法を示しています。
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres シンク
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: MySQL データベースを Hologres に同期するデータインジェストコネクタのテーブル追加機能
MySQL データインジェストコネクタは、2 つのシナリオで新しいテーブルをサポートするための設定項目を提供します。
設定項目 | 説明 | 注記 |
| チェックポイントからデプロイメントが再起動されたときに、新しいテーブル(以前の起動時に検出されなかったテーブル)を同期するかどうかを指定します。このオプションが有効になっている場合、Flink は新しいテーブルからスナップショットデータと増分データを読み取ります。 | この項目は、 |
| 増分読み取り中に新しいテーブルを同期するかどうかを指定します。 |
|
scan.newly-added-table.enabled と scan.binlog.newly-added-table.enabled を同時に有効にすることは推奨されません。両方の設定項目を有効にすると、データが重複して発生します。