このトピックでは、MySQL コネクタのベストプラクティスについて説明します。
クライアントごとに異なるサーバー ID を設定する
データベースデータを同期する各クライアントには、一意のサーバー ID があります。複数の MySQL CDC ソーステーブルが同じサーバー ID を共有している場合、サーバー ID の競合エラーが発生するため、クライアントごとに異なるサーバー ID を割り当てることをお勧めします。
サーバー ID の構成
サーバー ID は、Flink テーブルの DDL 文または SQL ヒントで設定できます。
SQL ヒントを使用してサーバー ID を割り当てることをお勧めします。 SQL ヒントの詳細については、「SQL ヒント」をご参照ください。
さまざまなシナリオでのサーバー ID の構成
シナリオ 1:増分スナップショットが無効になっているか、並列度が 1 です。
一意のサーバー ID を指定します。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;シナリオ 2:増分スナップショットが有効になっており、並列度が 1 より大きいです。
サーバー ID の範囲を指定し、範囲内の使用可能なサーバー ID の数が並列度以上であることを確認します。たとえば、並列度が 3 の場合は、次の文を実行してサーバー ID 範囲を設定します。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;シナリオ 3:CREATE TABLE AS(CTAS)文を使用してデータを同期します。
CTAS を使用してデータを同期し、複数の MySQL CDC ソースが同じ構成を共有している場合、ソーステーブルは自動的にマージされます。この場合、複数の MySQL CDC ソーステーブルに同じサーバー ID を指定できます。詳細については、「CREATE TABLE AS 文」トピックの「例 4:複数の CREATE TABLE AS 文の実行」セクションをご参照ください。
シナリオ 4:ジョブに複数の MySQL CDC ソーステーブルが含まれており、CTAS 文はデータ同期に使用されていません。
この場合、MySQL CDC ソーステーブルはマージできません。したがって、MySQL CDC ソーステーブルごとに異なるサーバー ID を割り当てる必要があります。増分スナップショットが有効になっており、並列度が 1 より大きい場合は、サーバー ID 範囲を指定する必要があります。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
メモリ最適化のためにチャンクオプションを構成する
MySQL CDC ソースコネクタが起動されると、コネクタはデータを読み取る必要のあるテーブル全体をスキャンし、プライマリキーに基づいてテーブルを複数のチャンクに分割し、この時点でのバイナリログファイルの位置を記録します。次に、MySQL CDC ソースコネクタは増分スナップショットメカニズムを実装して、各チャンクからデータを読み取ります。 Flink ジョブは定期的にチェックポイントを生成して、データが読み取られたチャンクを記録します。フェールオーバーが発生した場合、MySQL CDC コネクタは、データが読み取られていないチャンクからのデータの読み取りのみを続行する必要があります。すべてのチャンクのデータが読み取られた後、増分変更レコードが以前のバイナリログファイルの位置から読み取られます。 Flink ジョブは、バイナリログファイルの位置を記録するために、定期的にチェックポイントを生成し続けます。フェールオーバーが発生した場合、MySQL CDC コネクタは以前のバイナリログファイルの位置からデータを処理します。このようにして、1 回限りのセマンティクスが実装されます。
増分セーブポイントアルゴリズムの詳細については、Apache Flink ドキュメントのMySQL CDC コネクタをご参照ください。
デフォルトでは、単一列のプライマリキーを持つテーブルは、そのキーに基づいてチャンクに分割されます。複合プライマリキーを持つ物理テーブルは、デフォルトではキーの最初の列によってチャンク化されます。 Ververica Runtime(VVR)6.0.7 以降を使用する Realtime Compute for Apache Flink は、プライマリキーのないテーブルからのデータの読み取りをサポートしています。このようなテーブルのデータは、scan.incremental.snapshot.chunk.key-column で指定された NULL でない列に基づいてチャンクに配布されます。
メモリ最適化戦略
チャンクデータとメタデータは両方ともメモリに保存されます。メモリ不足(OOM)が発生した場合は、特定の状況に基づいてチャンク関連のコネクタオプションを調整します。
JobManager
チャンク数が多すぎると、すべてのチャンクに関するデータを保存する JobManager の OOM が発生する可能性があります。 JobManager OOM を回避するには、scan.incremental.snapshot.chunk.size を大きい値に設定してチャンク数を減らします。 jobmanager.memory.heap.size を構成して、JobManager の JVM ヒープメモリを増やすこともできます。詳細については、Apache Flink ドキュメントのメモリ構成をご参照ください。
TaskManager
チャンク内の行数が多すぎると、各チャンクからデータを読み取る TaskManager の OOM が発生する可能性があります。 TaskManager OOM エラーを回避するには、scan.incremental.snapshot.chunk.size を小さい値に設定して、各チャンク内の行数を減らします。 taskmanager.memory.framework.heap.size に大きい値を割り当てて、TaskManager の JVM ヒープメモリサイズを増やすこともできます。
VVR 8.0.8 以前を使用する Realtime Compute for Apache Flink では、最後のチャンクのデータサイズは通常大きく、TaskManager OOM エラーが発生する可能性があります。この問題を解決するには、VVR 8.0.9 以降にアップグレードします。
複合キーの最初の列に重複値が多い場合、最初のプライマリキー列に依存するデフォルトのチャンクメカニズムにより、チャンクサイズが大きくなり、TaskManager OOM エラーが発生する可能性があります。これを回避するには、scan.incremental.snapshot.chunk.key-column を構成して、別のプライマリキー列でテーブルを分割します。
ソースマージを有効にして データベース接続数を削減する
ソースマージは、複数の MySQL CDC ソーステーブルを持つジョブに役立ちます。これにより、Flink は MySQL に必要な最小限の接続数でバイナリログにアクセスできるようになり、MySQL データベースの負荷が軽減されます。この機能は、MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタはこれをサポートしていません。
ソースマージを有効にするには、SQL ドラフトに SET コマンドを含めます。
SET 'table.optimizer.source-merge.enabled' = 'true';MySQL CDC コネクタを使用する SQL ドラフトを作成するときに、この機能を有効にすることをお勧めします。既存のデプロイメントに対してソースマージを有効にした後、ステートレス起動を実行する必要があります。ソースマージはジョブグラフを変更するため、ステートフル起動が失敗し、データ損失が発生する可能性があります。
ソースマージが有効になると、同じ構成の MySQL CDC ソーステーブルがマージされます。すべての MySQL CDC ソーステーブルが同じ構成を共有している場合、対応する Flink ジョブの MySQL 接続数は次のとおりです。
スナップショット読み取り中は、接続数はソースの並列度と同じです。
増分読み取り中は、接続数は 1 です。
VVR 8.0.8 および VVR 8.0.9 を使用する Realtime Compute for Apache Flink では、MySQL CDC ソーステーブルのソースマージを有効にする場合は、
SET 'sql-gateway.exec-plan.enabled' = 'false';も指定する必要があります。演算子チェーンが切断されている場合、ソースからダウンストリーム演算子へのデータのシリアル化と逆シリアル化のオーバーヘッドが増加します。マージされる MySQL CDC ソーステーブルが多いほど、生成されるオーバーヘッドが高くなります。したがって、ソースマージを有効にした後、
pipeline.operator-chainingを false に設定することはお勧めしません。VVR 8.0.7 を使用する Realtime Compute for Apache Flink では、
pipeline.operator-chainingを false に設定すると、シリアル化の問題が発生する可能性があります。
バイナリログ解析オプションを構成して増分読み取りを高速化する
MySQL CDC コネクタがソーステーブルまたはデータインジェスチョンソースとして使用される場合、MySQL CDC コネクタはバイナリログファイルを解析して変更メッセージを生成します。バイナリログファイルには、すべてのテーブルの変更データが記録されます。バイナリログデータの解析を高速化するには、次の方法を使用できます。
並列解析と解析フィルタを有効にします。この機能は、VVR 8.0.7 以降の MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタではサポートされていません。
scan.only.deserialize.captured.tables.changelog.enabledを有効にする:指定されたテーブルの変更イベントのみを解析します。scan.only.deserialize.captured.tables.changelog.enabledを有効にする:複数のスレッドを使用してバイナリログファイルのイベントを解析し、解析されたイベントを消費キューに順番に送信します。このオプションを構成する場合は、コンソールで タスクマネージャー CPU の値を増やすことをお勧めします。
Debezium 関連のオプションを最適化する
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50debezium.max.queue.size:ブロッキングキューが保持できるデータレコードの最大数。 Debezium はデータベースからイベントストリームを読み取ると、イベントをダウンストリームシステムに書き込む前にブロッキングキューに配置します。デフォルト値:8192。debezium.max.batch.size:コネクタがバッチで処理できるイベントの最大数。デフォルト値:2048。debezium.poll.interval.ms:コネクタが新しい変更イベントをリクエストする前に待機する必要がある期間。デフォルト値:1000。単位:ミリ秒。
サンプルコード:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium 関連のオプションを構成します。
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 並列解析と解析フィルタ構成を有効にします。
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 指定されたテーブルの変更イベントのみを解析します。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- バイナリログファイルのイベントを解析するために複数のスレッドを使用します。
...
)データレイテンシを分析し、読み取りスループットを最適化する
増分読み取り中にデータレイテンシが発生した場合、次の手順を実行して読み取りスループットを最適化できます。
currentFetchEventTimeLag メトリックと currentFetchEventTimeLag メトリックの値を確認します。 currentFetchEventTimeLag は MySQL から Flink への送信レイテンシを示し、currentFetchEventTimeLag は処理レイテンシです。詳細については、「概要」のメトリックのセクションをご参照ください。
シナリオの説明
分析
currentFetchEventTimeLag は比較的小さく、currentEmitEventTimeLag は比較的小さく、頻繁に更新されません。
currentFetchEventTimeLag が小さいということは、MySQL データベースからのデータ取得のレイテンシが低いことを示しています。 currentEmitEventTimeLag が一貫して大きいということは、Flink ジョブによって処理される関連データの量が小さいことを示しています。このシナリオは一般的であり、期待どおりです。
currentFetchEventTimeLag と currentEmitEventTimeLag の両方が大きいです。
MySQL からデータをプルするソースの機能が弱いです。最適化のために、このセクションの次の手順を実行します。
ダウンストリーム演算子へのデータのストリーミングを遅くする逆圧力が存在するかどうかを確認します。逆圧力が存在する場合、sourceIdleTime メトリックの値が定期的に増加し、currentFetchEventTimeLag メトリックと currentEmitEventTimeLag メトリックの値が継続的に増加する可能性があります。この問題を解決するには、低速の演算子を特定し、その並列度を上げます。
JVM のTM CPU 使用率メトリックとTM GC 時間メトリックを確認して、CPU またはメモリリソースが使い果たされているかどうかを確認します。リソースの枯渇が発生している場合は、リソースをスケールアップして読み取りパフォーマンスを最適化することを検討してください。 miniBatch オプションを構成して読み取りスループットを向上させることもできます。詳細については、「Flink SQL の最適化」をご参照ください。
状態が大きいジョブに SinkUpsertMaterializer 演算子が存在する場合、読み取りパフォーマンスが低下します。この場合、ジョブの並列度を上げるか、SinkUpsertMaterializer 演算子を使用しないでください。詳細については、「SinkUpsertMaterializer の使用を避ける」をご参照ください。 SinkUpsertMaterializer 演算子を削除した後、ステートレス起動を実行します。これは、ジョブグラフが変更されたため、ステートフル起動が失敗したり、データ損失が発生したりする可能性があるため、必要です。
ApsaraDB RDS for MySQL を有効にして永続的なバイナリログストレージを実現する
ApsaraDB RDS for MySQL をデータソースとして使用できます。これにより、OSS に保存されているログバックアップを読み取ることができます。リクエストされたバイナリログファイル(タイムスタンプまたは位置で指定)が OSS に保存されている場合、Flink は処理する前にバイナリログをクラスターにダウンロードします。リクエストされたバイナリログファイルがローカルで利用可能な場合、Flink はデータベース接続に自動的に切り替えてバイナリログを読み取ります。この機能は、MySQL CDC の Realtime Compute for Apache Flink コネクタでのみサポートされています。 MySQL CDC の Apache Flink コネクタはこれをサポートしていません。
OSS からバイナリログを読み取るには、ApsaraDB RDS for MySQL 接続オプションを次のように構成します。
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', -- ApsaraDB RDS for MySQL インスタンス ID。
'rds.main-db-id' = '12345678', -- プライマリデータベース ID。
'rds.endpoint' = 'rds.aliyuncs.com'
...
)データベースデータとスキーマの変更を同期する
データ同期タスクの場合、データ統合シナリオ向けに最適化されたデータインジェスチョンデプロイメントを作成することをお勧めします。詳細については、「データインジェスチョンの YAML デプロイメントの概要」および「データインジェスチョンの YAML ドラフトを作成する(パブリックプレビュー)」をご参照ください。
次のコードスニペットは、データインジェスチョンデプロイメントが app_db という名前の MySQL データベースから Hologres にデータとスキーマの変更を同期する方法を示しています。
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres シンク
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: MySQL データベースを Hologres に同期する新しいテーブルを同期する
データインジェスチョンデプロイメントを介して MySQL から新しいテーブルを同期するには、必要に応じて次のオプションを構成します。
オプション | 説明 | 注記 |
| チェックポイントからデプロイメントが再起動されたときに、新しいテーブル(以前の起動時に検出されなかったテーブル)を同期するかどうかを指定します。このオプションが有効になっている場合、Flink は新しいテーブルからスナップショットデータと増分データを読み取ります。 | このオプションは、 |
| 増分読み取り中に新しいテーブルを同期するかどうかを指定します。 |
|
データの重複を防ぐために、scan.newly-added-table.enabled と scan.binlog.newly-added-table.enabled を同時に有効にしないでください。