すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MySQL

最終更新日:Jan 10, 2026

このトピックでは、MySQL コネクタの使用方法について説明します。

背景情報

MySQL コネクタは、ApsaraDB RDS for MySQL、PolarDB for MySQL、OceanBase (MySQL モード)、自己管理型 MySQL など、MySQL プロトコルと互換性のあるすべてのデータベースをサポートします。

重要

MySQL コネクタを使用して OceanBase から読み取る場合、OceanBase のバイナリログ (binlog) が有効になっており、正しく設定されていることを確認してください。詳細については、「関連操作」をご参照ください。この機能はパブリックプレビュー段階です。十分に評価し、慎重に使用することを推奨します。

MySQL コネクタは、以下をサポートします。

カテゴリ

詳細

サポートタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェストソース

ランタイムモード

ストリーミングモードのみ

データフォーマット

該当なし

特定のモニタリングメトリック

モニタリングメトリック

  • ソーステーブル

    • currentFetchEventTimeLag:データが生成されてから Source Operator によってプルされるまでの間隔。

      このメトリックは binlog フェーズでのみ有効です。スナップショットフェーズでは値は 0 です。

    • currentEmitEventTimeLag:データが生成されてから Source Operator を離れるまでの間隔。

      このメトリックは binlog フェーズでのみ有効です。スナップショットフェーズでは値は 0 です。

    • sourceIdleTime:ソーステーブルが新しいデータを生成していない期間。

  • ディメンションテーブルと結果テーブル:なし。

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

DataStream、SQL、データインジェスト YAML

結果テーブルのデータを更新または削除できますか?

はい

特徴

MySQL Change Data Capture (CDC) ソーステーブルは、最初にデータベースから完全な既存データを読み取るストリーミングソーステーブルです。その後、binlog の読み取りにスムーズに切り替わり、データが複数回読み取られたり、見逃されたりしないようにします。障害が発生した場合でも、データは exactly-once セマンティクスで処理されます。MySQL CDC ソーステーブルは、完全データの同時読み取りをサポートし、増分スナップショットアルゴリズムを使用して、ロックフリー読み取りと再開可能なデータ転送を実現します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。

  • バッチ処理とストリーム処理の統合:完全データと増分データの両方を読み取るため、個別のパイプラインは不要です。

  • 完全データの同時読み取り:パフォーマンスを水平にスケールします。

  • 完全読み取りから増分読み取りへのシームレスな切り替え:コンピューティングリソースを節約するために自動的にスケールインします。

  • 再開可能なデータ転送:完全データ読み取りフェーズ中の再開可能なデータ転送をサポートし、安定性を向上させます。

  • ロックフリー読み取り:オンラインのビジネス運用に影響を与えることなく完全データを読み取ります。

  • ApsaraDB RDS for MySQL からのバックアップログの読み取りをサポートします。

  • binlog ファイルの並列解析により、読み取り遅延を削減します。

前提条件

MySQL CDC ソーステーブルを使用する前に、「MySQL の設定」の説明に従って MySQL データベースを設定する必要があります。

ApsaraDB RDS for MySQL

  • Realtime Compute for Apache Flink で接続テストを実行し、ネットワーク接続を確認します。

  • MySQL バージョン:5.6、5.7、8.0.x。

  • binlog を有効にします。これはデフォルトで有効になっています。

  • binlog フォーマットを ROW に設定します。これはデフォルトのフォーマットです。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT 権限を持つ MySQL ユーザーが作成されていること。

  • MySQL データベースとテーブルを作成します。詳細については、「ApsaraDB RDS for MySQL のデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐため、特権アカウントを使用して MySQL データベースを作成してください。

  • IP アドレスホワイトリストを設定します。詳細については、「ApsaraDB RDS for MySQL のホワイトリストの設定」をご参照ください。

PolarDB for MySQL

  • Realtime Compute for Apache Flink で接続テストを実行し、ネットワーク接続を確認します。

  • MySQL バージョン:5.6、5.7、8.0.x。

  • binlog を有効にします。これはデフォルトで無効になっています。

  • binlog フォーマットを ROW に設定します。これはデフォルトのフォーマットです。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT 権限を持つ MySQL ユーザーが作成されていること。

  • MySQL データベースとテーブルを作成します。詳細については、「PolarDB for MySQL のデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐため、特権アカウントを使用して MySQL データベースを作成してください。

  • IP アドレスホワイトリストを設定します。詳細については、「PolarDB for MySQL クラスターのホワイトリストの設定」をご参照ください。

自己管理型 MySQL

  • Realtime Compute for Apache Flink で接続テストを実行し、ネットワーク接続を確認します。

  • MySQL バージョン:5.6、5.7、8.0.x。

  • binlog を有効にします。これはデフォルトで無効になっています。

  • binlog フォーマットを ROW に設定します。デフォルトのフォーマットは STATEMENT です。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT 権限を持つ MySQL ユーザーが作成されていること。

  • MySQL データベースとテーブルを作成します。詳細については、「自己管理型 MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐため、特権アカウントを使用して MySQL データベースを作成してください。

  • IP アドレスホワイトリストを設定します。詳細については、「自己管理型 MySQL インスタンスのホワイトリストの設定」をご参照ください。

制限事項

一般的な制限事項

  • MySQL CDC ソーステーブルはウォーターマークの定義をサポートしていません。

  • CREATE TABLE AS SELECT (CTAS) および CREATE DATABASE AS SELECT (CDAS) ジョブでは、MySQL CDC ソーステーブルは部分的なスキーマ変更の同期をサポートしています。サポートされている変更タイプの詳細については、「スキーマ進化の同期ポリシー」をご参照ください。

  • MySQL CDC コネクタはバイナリログトランザクション圧縮をサポートしていません。そのため、MySQL CDC コネクタを使用して増分データを消費する場合は、この機能が無効になっていることを確認してください。そうしないと、増分データの取得に失敗する可能性があります。

ApsaraDB RDS for MySQL の制限事項

  • ApsaraDB RDS for MySQL では、セカンダリデータベースまたは読み取り専用レプリカからのデータ読み取りは推奨されません。これらのインスタンスの binlog 保持期間はデフォルトで短くなっています。binlog が期限切れになりクリアされると、ジョブが binlog データを消費できなくなり、エラーが発生する可能性があります。

  • デフォルトでは、ApsaraDB RDS for MySQL はプライマリデータベースとセカンダリデータベース間の並列レプリケーションを有効にしており、トランザクション順序の一貫性を保証しません。これにより、プライマリ/セカンダリ切り替えおよびチェックポイント回復中に一部のデータが失われる可能性があります。この問題に対処するには、ApsaraDB RDS for MySQL で slave_preserve_commit_order オプションを手動で有効にすることができます。

PolarDB for MySQL の制限事項

MySQL CDC ソーステーブルは、PolarDB for MySQL 1.0.19 以前のマルチマスタークラスターからの読み取りをサポートしていません。詳細については、「マルチマスタークラスターとは」をご参照ください。これらのクラスターによって生成された binlog には、重複したテーブル ID が含まれている可能性があり、CDC ソーステーブルでスキーママッピングエラーが発生し、binlog データ解析中にエラーが発生する可能性があります。

オープンソース MySQL の制限事項

デフォルトでは、MySQL はプライマリ/レプリカ Binlog レプリケーション中にトランザクション順序を維持します。MySQL レプリカが並列レプリケーション (slave_parallel_workers > 1) を有効にしているが、slave_preserve_commit_order = ON になっていない場合、そのトランザクションコミット順序はプライマリデータベースと一致しない可能性があります。Flink CDC がチェックポイントから回復するとき、この順序の不一致によりデータが欠落する可能性があります。MySQL レプリカで slave_preserve_commit_order = ON を設定するか、slave_parallel_workers = 1 を設定することを推奨します。後者のオプションは、レプリケーション性能を犠牲にする可能性があります。

注意事項

  • 各 MySQL CDC データソースに異なるサーバー ID を明示的に設定する

    サーバー ID の目的

    各 MySQL CDC データソースに異なるサーバー ID を明示的に設定する必要があります。複数の MySQL CDC データソースが同じサーバー ID を共有し、再利用できない場合、binlog オフセットが不正確になり、データが複数回読み取られたり、スキップされたりする可能性があります。

    さまざまなシナリオでのサーバー ID の設定

    DDL 文でサーバー ID を指定できますが、動的ヒントを使用して設定することを推奨します。

    • 並列度 = 1 または増分スナップショットが無効

      ## 増分スナップショットフレームワークが有効でない場合、または並列度が 1 の場合は、特定のサーバー ID を指定できます。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • 並列度 > 1 かつ増分スナップショットが有効

      ## サーバー ID の範囲を指定する必要があります。範囲内の利用可能なサーバー ID の数は、並列度以上でなければなりません。並列度が 3 であると仮定します。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • CTAS を使用したデータ同期

      CTAS を使用してデータ同期を行う場合、CDC データソースの構成が同じであれば、ソースは自動的に再利用されます。この場合、複数の CDC データソースに同じサーバー ID を設定できます。詳細については、「例 4:複数の CTAS 文」をご参照ください。

    • 再利用できない複数の非 CTAS ソーステーブル

      ジョブに複数の MySQL CDC ソーステーブルが含まれ、同期に CTAS 文を使用しない場合、データソースは再利用できません。各 CDC ソーステーブルに異なるサーバー ID を提供する必要があります。同様に、増分スナップショットフレームワークが有効で並列度が 1 より大きい場合は、サーバー ID の範囲を指定する必要があります。

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • シンクテーブル

    • DDL 文で自動インクリメントプライマリキーを宣言しないでください。MySQL はデータ書き込み時にこのフィールドを自動的に入力します。

    • 少なくとも 1 つの非プライマリキーフィールドを宣言してください。そうしないと、エラーが報告されます。

    • DDL 文の NOT ENFORCED は、Flink がプライマリキーの有効性チェックを実行しないことを示します。プライマリキーの正確性と完全性を保証する必要があります。詳細については、「有効性チェック」をご参照ください。

  • ディメンションテーブル

    クエリの高速化にインデックスを使用するには、JOIN 句のフィールド順序がインデックス定義の順序と一致する必要があります。これは最左プレフィックスルールとして知られています。たとえば、インデックスが (a, b, c) にある場合、JOIN 条件は ON t.a = x AND t.b = y となります。

    Flink が生成した SQL はオプティマイザーによって書き換えられる可能性があり、これにより実際のデータベースクエリ中にインデックスが使用されなくなることがあります。インデックスが使用されているかどうかを確認するには、MySQL の実行計画 (EXPLAIN) またはスロークエリログをチェックして、実際に実行される SELECT 文を確認してください。

SQL

SQL ジョブで MySQL コネクタをソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。

構文

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

説明
  • コネクタが結果テーブルに書き込む方法:受信した各レコードに対して、コネクタは単一の SQL 文を構築して実行します。具体的な SQL 文はテーブルスキーマによって異なります:

    • プライマリキーのない結果テーブルの場合、コネクタは INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); 文を実行します。

    • プライマリキーのある結果テーブルの場合、コネクタは INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; 文を実行します。注意:物理テーブルにプライマリキー以外の一意なインデックス制約がある場合、異なるプライマリキーを持つが同じ一意なインデックス値を持つ 2 つのレコードを挿入すると、下流のデータベースで一意なインデックスの競合が発生し、データの上書きや潜在的なデータ損失を引き起こす可能性があります。

  • MySQL データベースで自動インクリメントプライマリキーを定義する場合、Flink DDL 文で自動インクリメントフィールドを宣言しないでください。データベースはデータ挿入時にこのフィールドを自動的に入力します。コネクタは自動インクリメントフィールドを持つデータの書き込みと削除のみをサポートし、更新はサポートしていません。

WITH パラメーター

  • 一般

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    はい

    STRING

    なし

    ソーステーブルとして使用する場合は、このオプションを mysql-cdc または mysql に設定します。これらは同等です。ディメンションテーブルまたは結果テーブルとして使用する場合は、このオプションを mysql に設定します。

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    はい

    STRING

    なし

    VPC アドレスを入力することを推奨します。

    説明

    MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合、VPC 間のネットワーク接続を確立するか、パブリックネットワークを使用してアクセスする必要があります。詳細については、「ワークスペースの管理と運用」および「フルマネージド Flink クラスターがパブリックネットワークにアクセスする方法」をご参照ください。

    username

    MySQL データベースサービスのユーザー名。

    はい

    STRING

    なし

    なし。

    password

    MySQL データベースサービスのパスワード。

    はい

    STRING

    なし

    なし。

    database-name

    MySQL データベースの名前。

    はい

    STRING

    なし

    • データベースをソーステーブルとして使用する場合、データベース名に正規表現を使用して複数のデータベースからデータを読み取ることができます。

    • 正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号を使用しないでください。詳細については、table-name オプションの備考欄をご参照ください。

    table-name

    MySQL テーブルの名前。

    はい

    STRING

    なし

    • ソーステーブル名に正規表現を使用して、複数のテーブルからデータを読み取ることができます。

      複数の MySQL テーブルからデータを読み取る場合、複数の CTAS 文を単一のジョブとして投入します。これにより、複数のバイナリログリスナーが有効になるのを防ぎ、パフォーマンスと効率が向上します。詳細については、「複数の CTAS 文: 単一のジョブとして投入する」をご参照ください。

    • 正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号を使用しないでください。詳細については、以下の注記をご参照ください。

    説明

    MySQL CDC ソーステーブルがテーブル名を照合する際、指定した database-nametable-name を文字列 \\. (VVR 8.0.1 より前のバージョンでは文字 . が使用されます) を使用して完全パスの正規表現に結合します。その後、この正規表現を使用して MySQL データベース内のテーブルの完全修飾名を照合します。

    たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' と設定した場合、コネクタは正規表現 db_.*\\.tb_.+ (8.0.1 より前のバージョンでは db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、どのテーブルを読み取るかを決定します。

    port

    MySQL データベースサービスのポート番号。

    いいえ

    INTEGER

    3306

    なし。

  • ソース固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    server-id

    データベースクライアントの数値 ID。

    いいえ

    STRING

    5400 から 6400 の間のランダムな値が生成されます。

    この ID は MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブには、異なる ID を設定することを推奨します。

    このオプションは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。詳細については、「サーバー ID の使用」をご参照ください。

    scan.incremental.snapshot.enabled

    増分スナップショットを有効にするかどうかを指定します。

    いいえ

    BOOLEAN

    true

    増分スナップショットはデフォルトで有効になっています。増分スナップショットは、完全なデータスナップショットを読み取るための新しいメカニズムです。古いスナップショット読み取り方法と比較して、増分スナップショットにはいくつかの利点があります:

    • ソースは完全データを並列で読み取ることができます。

    • ソースは完全データ読み取り時にチャンクレベルのチェックポイントをサポートします。

    • ソースは完全データ読み取り時にグローバル読み取りロック (FLUSH TABLES WITH read lock) を取得する必要がありません。

    ソースが同時読み取りをサポートするようにしたい場合、各同時リーダーには一意のサーバー ID が必要です。したがって、server-id は 5400-6400 のような範囲でなければならず、その範囲は並列度以上でなければなりません。

    説明

    この設定項目は Ververica Runtime (VVR) 11.1 以降では削除されています。

    scan.incremental.snapshot.chunk.size

    各チャンクのサイズ (行数)。

    いいえ

    INTEGER

    8096

    増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンクのデータは、完全に読み取られるまでメモリにバッファリングされます。

    チャンクに含まれる行数が少ないほど、テーブル内のチャンクの総数は多くなります。これにより障害回復の粒度は小さくなりますが、メモリ不足 (OOM) の問題を引き起こし、全体のスループットを低下させる可能性があります。したがって、トレードオフを考慮し、合理的なチャンクサイズを設定する必要があります。

    scan.snapshot.fetch.size

    テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。

    いいえ

    INTEGER

    1024

    なし。

    scan.startup.mode

    データ消費の起動モード。

    いいえ

    STRING

    initial

    有効な値:

    • initial (デフォルト):最初の起動時に完全な既存データをスキャンし、その後最新の binlog データを読み取ります。

    • latest-offset:最初の起動時に既存データをスキャンしません。binlog の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。

    • earliest-offset:既存データをスキャンしません。利用可能な最も古い binlog から読み取りを開始します。

    • specific-offset:既存データをスキャンしません。指定した特定の binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方を設定するか、scan.startup.specific-offset.gtid-set のみを設定して特定の GTID セットから開始することでオフセットを指定できます。

    • timestamp:既存データをスキャンしません。指定したタイムスタンプから binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。

    重要

    earliest-offsetspecific-offset、または timestamp 起動モードを使用する場合、スキーマの不一致によるエラーを避けるために、指定された binlog 消費位置とジョブの起動時刻の間で対応するテーブルのスキーマが変更されていないことを確認してください。

    scan.startup.specific-offset.file

    特定オフセット起動モードを使用する場合の開始オフセットの binlog ファイル名。

    いいえ

    STRING

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の形式は、たとえば mysql-bin.000003 です。

    scan.startup.specific-offset.pos

    特定オフセット起動モードを使用する場合に、指定された binlog ファイル内で開始するオフセット。

    いいえ

    INTEGER

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

    scan.startup.specific-offset.gtid-set

    特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。

    いいえ

    STRING

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの形式は、たとえば 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19 です。

    scan.startup.timestamp-millis

    タイムスタンプ起動モードを使用する場合のミリ秒単位のタイムスタンプとしての開始オフセット。

    いいえ

    LONG

    なし

    この設定を使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒単位です。

    重要

    時間を指定すると、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを判断し、指定された時間に対応する binlog ファイルを特定しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

    server-time-zone

    データベースが使用するセッションタイムゾーン。

    いいえ

    STRING

    このオプションを指定しない場合、システムは Flink ジョブのランタイム環境のタイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

    例:Asia/Shanghai。このオプションは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。

    debezium.min.row.count.to.stream.results

    テーブルの行数がこの値を超えると、バッチ読み取りモードが使用されます。

    いいえ

    INTEGER

    1000

    Flink は MySQL ソーステーブルから次の方法でデータを読み取ります:

    • 完全読み取り:テーブル全体のデータを直接メモリに読み込みます。この方法は高速ですが、対応する量のメモリを消費します。ソーステーブルが非常に大きい場合、OOM のリスクをもたらす可能性があります。

    • バッチ読み取り:データを複数のバッチで読み取ります。各バッチには一定数の行が含まれ、すべてのデータが読み取られるまで続きます。この方法は、大きなテーブルを読み取る際の OOM リスクを回避しますが、比較的低速です。

    connect.timeout

    MySQL データベースサーバーへの接続がタイムアウトし、再試行するまでの最大待機時間。

    いいえ

    DURATION

    30s

    なし。

    connect.max-retries

    MySQL データベースサービスへの接続に失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    connection.pool.size

    データベース接続プールのサイズ。

    いいえ

    INTEGER

    20

    データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。

    jdbc.properties.*

    JDBC URL のカスタム接続オプション。

    いいえ

    STRING

    なし

    カスタム接続オプションを渡すことができます。たとえば、SSL プロトコルを使用しない場合は、'jdbc.properties.useSSL' = 'false' と設定できます。

    サポートされている接続オプションの詳細については、「MySQL 設定プロパティ」をご参照ください。

    debezium.*

    Debezium が binlog を読み取るためのカスタムオプション。

    いいえ

    STRING

    なし

    カスタム Debezium オプションを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

    heartbeat.interval

    ソースがハートビートイベントを使用して binlog オフセットを進める間隔。

    いいえ

    DURATION

    30s

    ハートビートイベントは、ソースの binlog オフセットを進めるために使用され、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、binlog オフセットは自動的に進みません。ハートビートイベントは binlog オフセットを前進させ、期限切れの binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になる問題を回避します。

    scan.incremental.snapshot.chunk.key-column

    スナップショットフェーズ中にチャンクを分割するために使用される列。

    備考を参照

    STRING

    なし

    • プライマリキーのないテーブルでは必須です。選択された列は非ヌル型 (NOT NULL) でなければなりません。

    • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

    rds.region-id

    ApsaraDB RDS for MySQL インスタンスのリージョン ID。

    OSS からアーカイブされたログを読み取る場合に必須です。

    STRING

    なし

    リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

    rds.access-key-id

    ApsaraDB RDS for MySQL インスタンスのアカウントの AccessKey ID。

    OSS からアーカイブされたログを読み取る場合に必須です。

    STRING

    なし

    詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、シークレット管理を使用して AccessKey ID を管理することを推奨します。詳細については、「変数の管理」をご参照ください。

    rds.access-key-secret

    ApsaraDB RDS for MySQL インスタンスのアカウントの AccessKey Secret。

    OSS からアーカイブされたログを読み取る場合に必須です。

    STRING

    なし

    詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、シークレット管理を使用して AccessKey Secret を管理することを推奨します。詳細については、「変数の管理」をご参照ください。

    rds.db-instance-id

    ApsaraDB RDS for MySQL インスタンスのインスタンス ID。

    OSS からアーカイブされたログを読み取る場合に必須です。

    STRING

    なし

    なし。

    rds.main-db-id

    ApsaraDB RDS for MySQL インスタンスのプライマリデータベースの ID。

    いいえ

    STRING

    なし

    rds.download.timeout

    OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。

    いいえ

    DURATION

    60s

    なし。

    rds.endpoint

    OSS binlog 情報を取得するためのサービスエンドポイント。

    いいえ

    STRING

    なし

    • 有効な値の詳細については、「エンドポイント」をご参照ください。

    • VVR 8.0.8 以降でのみサポートされます。

    scan.incremental.close-idle-reader.enabled

    スナップショットフェーズ終了後にアイドル状態のリーダーを閉じるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    • VVR 8.0.1 以降でのみサポートされます。

    • この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定する必要があります。

    scan.read-changelog-as-append-only.enabled

    変更ログストリームを追記専用ストリームに変換するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプのメッセージが INSERT メッセージに変換されます。これは、上流テーブルからの削除メッセージを保存する必要があるなど、特別なシナリオでのみ有効にしてください。

    • false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。

    説明

    VVR 8.0.8 以降でのみサポートされます。

    scan.only.deserialize.captured.tables.changelog.enabled

    増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

    いいえ

    BOOLEAN

    • VVR 8.x バージョンではデフォルト値は false です。

    • VVR 11.1 以降ではデフォルト値は true です。

    有効な値:

    • true:ターゲットテーブルの変更データのみを逆シリアル化して、binlog の読み取りを高速化します。

    • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

    説明
    • VVR 8.0.7 以降でのみサポートされます。

    • VVR 8.0.8 以前で使用する場合、オプション名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更する必要があります。

    scan.parse.online.schema.changes.enabled

    増分フェーズ中に、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:RDS のロックレス変更 DDL イベントを解析します。

    • false (デフォルト):RDS のロックレス変更 DDL イベントを解析しません。

    これは実験的な機能です。オンラインでのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。

    説明

    VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.backfill.skip

    スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

    • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

    バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。

    重要

    バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。少なくとも 1 回のセマンティクスのみが保証されます。

    説明

    VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    スナップショット読み取りフェーズ中に、境界のないチャンクを最初に分散するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中に、境界のないチャンクを最初に分散します。

    • false (デフォルト):スナップショット読み取りフェーズ中に、境界のないチャンクを最初に分散しません。

    これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のチャンクを同期する際の Out-of-Memory (OOM) エラーのリスクを軽減できます。ジョブの最初の起動前に追加することを推奨します。

    説明

    VVR 11.1 以降でのみサポートされます。

    binlog.session.network.timeout

    binlog 接続のネットワーク読み取り/書き込みタイムアウト。

    いいえ

    DURATION

    10m

    このオプションを 0s に設定すると、MySQL サーバーのデフォルトのタイムアウトが使用されます。

    説明

    VVR 11.5 以降でのみサポートされます。

    scan.rate-limit.records-per-second

    ソースが 1 秒あたりに送信できる最大レコード数。

    いいえ

    LONG

    なし

    このオプションは、データ読み取りを制限する必要があるシナリオに適用されます。制限は、完全フェーズと増分フェーズの両方で有効になります。

    ソースの numRecordsOutPerSecond メトリックは、データストリームが 1 秒あたりに出力するレコード数を反映します。このメトリックに基づいてこのオプションを調整できます。

    完全データ読み取りフェーズでは、通常、各バッチで読み取るデータレコードの数を減らす必要があります。scan.incremental.snapshot.chunk.size オプションの値を減らすことができます。

    説明

    VVR 11.5 以降でのみサポートされます。

  • ディメンションテーブル固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL の形式は次のとおりです:jdbc:mysql://<endpoint>:<port>/<database_name>

    lookup.max-retries

    データ読み取りに失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    VVR 6.0.7 以降でのみサポートされます。

    lookup.cache.strategy

    キャッシュポリシー。

    いいえ

    STRING

    None

    None、LRU、ALL の 3 つのキャッシュポリシーをサポートします。値の詳細については、「背景情報」をご参照ください。

    説明

    LRU キャッシュポリシーを使用する場合、lookup.cache.max-rows オプションも設定する必要があります。

    lookup.cache.max-rows

    キャッシュされる最大行数。

    いいえ

    INTEGER

    100000

    • LRU キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要があります。

    • ALL キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要はありません。

    lookup.cache.ttl

    キャッシュの生存時間 (TTL)。

    いいえ

    DURATION

    10 s

    lookup.cache.ttl の設定は lookup.cache.strategy に依存します:

    • lookup.cache.strategyNone に設定されている場合、lookup.cache.ttl を設定する必要はありません。これはキャッシュが期限切れにならないことを意味します。

    • lookup.cache.strategyLRU に設定されている場合、lookup.cache.ttl はキャッシュの TTL です。デフォルトでは期限切れになりません。

    • lookup.cache.strategyALL に設定されている場合、lookup.cache.ttl はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。

    1min や 10s のような時間形式を使用します。

    lookup.max-join-rows

    プライマリテーブルの各レコードに対してディメンションテーブルをクエリする際に返される最大結果数。

    いいえ

    INTEGER

    1024

    なし。

    lookup.filter-push-down.enabled

    ディメンションテーブルのフィルタープッシュダウンを有効にするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:ディメンションテーブルのフィルタープッシュダウンを有効にします。MySQL データベーステーブルからデータをロードする際、ディメンションテーブルは SQL ジョブで設定された条件に基づいて事前にデータをフィルタリングします。

    • false (デフォルト):ディメンションテーブルのフィルタープッシュダウンを無効にします。MySQL データベーステーブルからデータをロードする際、ディメンションテーブルはすべてのデータをロードします。

    説明

    VVR 8.0.7 以降でのみサポートされます。

    重要

    ディメンションテーブルのフィルタープッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効にすべきです。MySQL ソーステーブルはフィルタープッシュダウンの有効化をサポートしていません。Flink テーブルがソーステーブルとディメンションテーブルの両方として使用され、ディメンションテーブルでフィルタープッシュダウンが有効になっている場合、SQL ヒントを使用してソーステーブルに対してこの設定項目を明示的に false に設定する必要があります。そうしないと、ジョブが異常に実行される可能性があります。

  • シンク固有の

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL の形式は次のとおりです:jdbc:mysql://<endpoint>:<port>/<database_name>

    sink.max-retries

    データ書き込みに失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    sink.buffer-flush.batch-size

    1 回のバッチ書き込みのレコード数。

    いいえ

    INTEGER

    4096

    なし。

    sink.buffer-flush.max-rows

    メモリにバッファリングされるデータレコードの数。

    いいえ

    INTEGER

    10000

    このオプションは、プライマリキーが指定された後にのみ有効になります。

    sink.buffer-flush.interval

    バッファーをフラッシュする時間間隔。指定された待機時間の後もバッファー内のデータが出力条件を満たさない場合、システムはバッファー内のすべてのデータを自動的に出力します。

    いいえ

    DURATION

    1s

    なし。

    sink.ignore-delete

    DELETE 操作を無視するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    Flink SQL によって生成されたストリームに削除または更新前レコードが含まれている場合、複数の出力タスクが同じテーブルの異なるフィールドを同時に更新すると、データの一貫性が損なわれる可能性があります。

    たとえば、レコードが削除された後、別のタスクが一部のフィールドのみを更新します。更新されなかったフィールドは null またはデフォルト値になり、データエラーを引き起こします。

    sink.ignore-delete を true に設定することで、上流の DELETE および UPDATE_BEFORE 操作を無視して、このような問題を回避できます。

    説明
    • UPDATE_BEFORE は Flink のリトラクションメカニズムの一部であり、更新操作で古い値を「取り消す」ために使用されます。

    • ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE レコードはスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。

    sink.ignore-null-when-update

    データを更新する際、入力データフィールドが null の場合に、対応するフィールドを null に更新するか、そのフィールドの更新をスキップするかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:フィールドを更新しません。このオプションは、Flink テーブルにプライマリキーが設定されている場合にのみ true に設定できます。true に設定した場合:

      • VVR 8.0.6 以前では、結果テーブルへのデータ書き込みにバッチ実行はサポートされません。

      • VVR 8.0.7 以降では、結果テーブルへのデータ書き込みにバッチ実行がサポートされます。

        バッチ書き込みは書き込み効率と全体のスループットを大幅に向上させることができますが、データ遅延や OOM のリスクを引き起こす可能性があります。したがって、実際のビジネスシナリオに基づいてトレードオフを行ってください。

    • false:フィールドを null に更新します。

    説明

    このオプションは VVR 8.0.5 以降でのみサポートされます。

型マッピング

  • CDC ソーステーブル

    MySQL CDC フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    重要

    MySQL の TINYINT(1) 型を 0 と 1 以外の値を保存するために使用しないことを推奨します。property-version = 0 の場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink の BOOLEAN 型にマッピングし、データの不正確さを引き起こす可能性があります。TINYINT(1) 型を使用して 0 と 1 以外の値を保存するには、構成オプション catalog.table.treat-tinyint1-as-boolean をご参照ください。

  • ディメンションテーブルと結果テーブル

    MySQL フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    説明

    p は 38 以下でなければなりません。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink は MySQL BLOB 型のレコードを最大 2,147,483,647 (2^31 - 1) バイトまでしかサポートしません。

    BLOB

    MEDIUMBLOB

    LONGBLOB

データインジェスト

データインジェスト YAML ジョブで MySQL コネクタをデータソースとして使用できます。

構文

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

構成項目

パラメーター

説明

必須

データの型

デフォルト値

備考

type

データソースのタイプ。

はい

STRING

なし

静的フィールドは mysql に設定されます。

name

データソースの名前。

いいえ

STRING

なし

なし。

hostname

MySQL データベースの IP アドレスまたはホスト名。

はい

STRING

なし

Virtual Private Cloud (VPC) アドレスを入力することを推奨します。

説明

MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合、VPC 間のネットワーク接続を確立するか、パブリックネットワークを使用してアクセスする必要があります。詳細については、「ワークスペースの管理と運用」および「フルマネージド Flink クラスターがパブリックネットワークにアクセスする方法」をご参照ください。

username

MySQL データベースサービスのユーザー名。

はい

STRING

なし

なし。

password

MySQL データベースサービスのパスワード。

はい

STRING

なし

なし。

tables

同期する MySQL データテーブル。

はい

STRING

なし

  • このオプションは、複数のテーブルからデータを読み取るために正規表現をサポートします。

  • 複数の正規表現をカンマで区切ることができます。

説明
  • 正規表現に開始および終了マッチング文字 ^ および $ を使用しないでください。バージョン 11.2 では、データベースの正規表現はピリオドで分割して取得されます。開始および終了マッチング文字を使用すると、取得したデータベースの正規表現が使用できなくなります。たとえば、^db.user_[0-9]+$db.user_[0-9]+ に変更する必要があります。

  • データベース名とテーブル名はピリオドで区切られます。ピリオドを任意の文字に一致させるために使用するには、バックスラッシュでピリオドをエスケープする必要があります。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

tables.exclude

同期から除外するテーブル。

いいえ

STRING

なし

  • このオプションは、複数のテーブルからデータを除外するために正規表現をサポートします。

  • 複数の正規表現をカンマで区切ることができます。

説明

データベース名とテーブル名はピリオドで区切られます。ピリオドを任意の文字に一致させるために使用するには、バックスラッシュでピリオドをエスケープする必要があります。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

port

MySQL データベースサービスのポート番号。

いいえ

INTEGER

3306

なし。

schema-change.enabled

スキーマ変更イベントを送信するかどうかを指定します。

いいえ

BOOLEAN

true

なし。

server-id

同期に使用されるデータベースクライアントの数値 ID または ID の範囲。

いいえ

STRING

5400 から 6400 の間のランダムな値が生成されます。

この ID は MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブには、異なる ID を設定することを推奨します。

このオプションは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。

jdbc.properties.*

JDBC URL のカスタム接続オプション。

いいえ

STRING

なし

カスタム接続オプションを渡すことができます。たとえば、SSL プロトコルを使用しない場合は、'jdbc.properties.useSSL' = 'false' と設定できます。

サポートされている接続オプションの詳細については、「MySQL 設定プロパティ」をご参照ください。

debezium.*

Debezium が binlog を読み取るためのカスタムオプション。

いいえ

STRING

なし

カスタム Debezium オプションを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

scan.incremental.snapshot.chunk.size

各チャンクのサイズ (行数)。

いいえ

INTEGER

8096

MySQL テーブルは複数のチャンクに分割されて読み取られます。チャンクのデータは、完全に読み取られるまでメモリにバッファリングされます。

チャンクあたりの行数が少ないほど、障害回復の粒度は細かくなりますが、メモリ不足 (OOM) エラーを引き起こし、全体のスループットを低下させる可能性もあります。したがって、これらのトレードオフのバランスをとる合理的なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。

いいえ

INTEGER

1024

なし。

scan.startup.mode

データ消費の起動モード。

いいえ

STRING

initial

有効な値:

  • initial (デフォルト):最初の起動時に完全な既存データをスキャンし、その後最新の binlog データを読み取ります。

  • latest-offset:最初の起動時に既存データをスキャンしません。binlog の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。

  • earliest-offset:既存データをスキャンしません。利用可能な最も古い binlog から読み取りを開始します。

  • specific-offset:既存データをスキャンしません。指定した特定の binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方を設定するか、scan.startup.specific-offset.gtid-set のみを設定して特定の GTID セットから開始することでオフセットを指定できます。

  • timestamp:既存データをスキャンしません。指定したタイムスタンプから binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。

重要

earliest-offsetspecific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定された開始オフセット時刻のスキーマと異なる場合、スキーマの不一致によりジョブは失敗します。つまり、これら 3 つの起動モードを使用する場合、指定された binlog 消費位置とジョブの起動時刻の間で対応するテーブルのスキーマが変更されていないことを確認する必要があります。

scan.startup.specific-offset.file

特定オフセット起動モードを使用する場合の開始オフセットの binlog ファイル名。

いいえ

STRING

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の形式は、たとえば mysql-bin.000003 です。

scan.startup.specific-offset.pos

特定オフセット起動モードを使用する場合に、指定された binlog ファイル内で開始するオフセット。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.gtid-set

特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。

いいえ

STRING

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの形式は、たとえば 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19 です。

scan.startup.timestamp-millis

タイムスタンプ起動モードを使用する場合のミリ秒単位のタイムスタンプとしての開始オフセット。

いいえ

LONG

なし

この設定を使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒単位です。

重要

時間を指定すると、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを判断し、指定された時間に対応する binlog ファイルを特定しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

server-time-zone

データベースが使用するセッションタイムゾーン。

いいえ

STRING

このオプションを指定しない場合、システムは Flink ジョブのランタイム環境のタイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

例:Asia/Shanghai。このオプションは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。

scan.startup.specific-offset.skip-events

特定のオフセットから読み取る際にスキップする binlog イベントの数。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.skip-rows

特定のオフセットから読み取る際にスキップする行の変更数。単一の binlog イベントは複数の行の変更に対応する場合があります。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

connect.timeout

MySQL データベースサーバーへの接続がタイムアウトし、再試行するまでの最大待機時間。

いいえ

DURATION

30s

なし。

connect.max-retries

MySQL データベースサービスへの接続に失敗した後の最大再試行回数。

いいえ

INTEGER

3

なし。

connection.pool.size

データベース接続プールのサイズ。

いいえ

INTEGER

20

データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。

heartbeat.interval

ソースがハートビートイベントを使用して binlog オフセットを進める間隔。

いいえ

DURATION

30s

ハートビートイベントは、ソースの binlog オフセットを進めるために使用され、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、binlog オフセットは自動的に進みません。ハートビートイベントは binlog オフセットを前進させ、期限切れの binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になる問題を回避します。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にチャンクを分割するために使用される列。

いいえ。

STRING

なし

プライマリキーから 1 つの列のみを選択できます。

rds.region-id

ApsaraDB RDS for MySQL インスタンスのリージョン ID。

OSS からアーカイブされたログを読み取る場合に必須です。

STRING

なし

リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

rds.access-key-id

ApsaraDB RDS for MySQL インスタンスのアカウントの AccessKey ID。

OSS からアーカイブされたログを読み取る場合に必須です。

STRING

なし

詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、シークレット管理を使用して AccessKey ID を管理することを推奨します。詳細については、「変数の管理」をご参照ください。

rds.access-key-secret

ApsaraDB RDS for MySQL インスタンスのアカウントの AccessKey Secret。

OSS からアーカイブされたログを読み取る場合に必須です。

STRING

なし

詳細については、「AccessKey ID と AccessKey Secret の表示方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、シークレット管理を使用して AccessKey Secret を管理することを推奨します。詳細については、「変数の管理」をご参照ください。

rds.db-instance-id

ApsaraDB RDS for MySQL インスタンスのインスタンス ID。

OSS からアーカイブされたログを読み取る場合に必須です。

STRING

なし

なし。

rds.main-db-id

ApsaraDB RDS for MySQL インスタンスのプライマリデータベースの ID。

いいえ

STRING

なし

プライマリデータベース ID の取得方法の詳細については、「ApsaraDB RDS for MySQL のログバックアップ」をご参照ください。

rds.download.timeout

OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。

いいえ

DURATION

60s

なし。

rds.endpoint

OSS binlog 情報を取得するためのサービスエンドポイント。

いいえ

STRING

なし

有効な値の詳細については、「エンドポイント」をご参照ください。

rds.binlog-directory-prefix

binlog ファイルを格納するディレクトリのプレフィックス。

いいえ

STRING

rds-binlog-

なし。

rds.use-intranet-link

binlog ファイルのダウンロードに内部ネットワークを使用するかどうかを指定します。

いいえ

BOOLEAN

true

なし。

rds.binlog-directories-parent-path

binlog ファイルが格納されている親ディレクトリの絶対パス。

いいえ

STRING

なし

なし。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

INTEGER

1000

メタデータがこの値より大きい場合、複数回に分けて送信されます。

chunk-key.even-distribution.factor.lower-bound

均等分割のためのチャンク分布係数の下限。

いいえ

DOUBLE

0.05

分布係数がこのパラメーターの値より小さい場合、チャンクは均等に分散されません。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。

chunk-key.even-distribution.factor.upper-bound

均等分割のためのチャンク分布係数の上限。

いいえ

DOUBLE

1000.0

分布係数がこの値より大きい場合、不均等分割が使用されます。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 行の総数。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズ終了後にアイドル状態のリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定する必要があります。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

いいえ

BOOLEAN

  • VVR 8.x バージョンではデフォルト値は false です。

  • VVR 11.1 以降ではデフォルト値は true です。

有効な値:

  • true:ターゲットテーブルの変更データのみを逆シリアル化して、binlog の読み取りを高速化します。

  • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に、複数のスレッドを使用して変更イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:binlog イベントの順序を維持しながら、変更イベントの逆シリアル化フェーズで複数のスレッドを使用し、読み取りを高速化します。

  • false (デフォルト):イベントの逆シリアル化フェーズで単一のスレッドを使用します。

説明

VVR 8.0.11 以降でのみサポートされます。

scan.parallel-deserialize-changelog.handler.size

複数のスレッドを使用して変更イベントを解析する場合のイベントハンドラの数。

いいえ

INTEGER

2

説明

VVR 8.0.11 以降でのみサポートされます。

metadata-column.include-list

ダウンストリームに渡すメタデータ列。

いいえ

STRING

なし

利用可能なメタデータには、table_namedatabase_nameop_tses_tsquery_logfile、および pos が含まれます。カンマで区切って使用できます。

説明

MySQL CDC YAML コネクタは、op_type メタデータ列の追加を必要とせず、サポートもしていません。Transform 式で直接 __data_event_type__ を使用して変更データタイプを取得できます。

重要

file メタデータ列は、データが存在する binlog ファイルを表します。完全フェーズでは "" であり、増分フェーズでは binlog ファイル名です。pos メタデータ列は、binlog ファイル内のデータのオフセットを表します。完全フェーズでは "0" であり、増分フェーズでは binlog ファイル内のデータのオフセットです。これらの 2 つのメタデータ列は VVR 11.5 からサポートされています。

es_ts メタデータ列は、変更ログに対応する MySQL のトランザクションの開始時刻を表します。MySQL バージョン 8.0.x を使用している場合にのみ追加できます。以前のバージョンの MySQL を使用している場合は、このメタデータ列を追加しないでください。

scan.newly-added-table.enabled

チェックポイントから再起動する際に、前回の実行で一致しなかった新しく追加されたテーブルを同期するか、または現在一致しないが状態に保存されているテーブルを削除するかを指定します。

いいえ

BOOLEAN

false

これは、チェックポイントまたはセーブポイントから再起動するときに有効になります。

scan.binlog.newly-added-table.enabled

増分フェーズ中に、一致した新しく追加されたテーブルからデータを送信するかどうかを指定します。

いいえ

BOOLEAN

false

scan.newly-added-table.enabled と同時に有効にすることはできません。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にチャンクの分割キーとして使用する特定のテーブルの列を指定します。

いいえ

STRING

なし

  • テーブル名とフィールド名をコロン (:) で接続してルールを形成します。テーブル名は正規表現にすることができます。複数のルールをセミコロン (;) で区切って定義できます。例:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • プライマリキーのないテーブルでは必須です。選択された列は null であってはなりません (NOT NULL)。プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

scan.parse.online.schema.changes.enabled

増分フェーズ中に、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。

いいえ

ブール

false

有効な値:

  • true:RDS のロックレス変更 DDL イベントを解析します。

  • false (デフォルト):RDS のロックレス変更 DDL イベントを解析しません。

これは実験的な機能です。オンラインでロックレス変更を実行する前に、回復用に Flink ジョブのスナップショットを作成することを推奨します。

説明

VVR 11.0 以降でのみサポートされています。

scan.incremental.snapshot.backfill.skip

スナップショットの読み取りフェーズでバックフィルをスキップするかどうかを指定します。

いいえ

ブーリアン

false

有効な値:

  • true:スナップショット読み取りフェーズでバックフィルをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズでバックフィルをスキップしません。

バックフィルがスキップされると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があり、at-least-once セマンティクスのみが保証されます。

説明

この機能は、Flink コンピュートエンジン Ververica Runtime (VVR) 11.1 以降でのみ利用可能です。

treat-tinyint1-as-boolean.enabled

TINYINT(1) 型をブール値として扱うかどうかを指定します。

いいえ

ブーリアン

true

有効な値:

  • true (デフォルト):TINYINT(1) 型をブール値型として扱います。

  • false:TINYINT(1) 型をブール値型として扱いません。

タイムスタンプの日時としての扱いが有効

TIMESTAMP 型を DATETIME 型として扱うかどうかを指定します。

いいえ

ブーリアン

有効な値:

  • true:MySQL TIMESTAMP 型を DATETIME 型として扱い、CDC TIMESTAMP 型にマッピングします。

  • false (デフォルト):MySQL TIMESTAMP 型を CDC TIMESTAMP_LTZ 型にマッピングします。

MySQL TIMESTAMP 型は UTC 時刻を格納し、タイムゾーンの影響を受けます。MySQL DATETIME 型はリテラル時刻を格納し、タイムゾーンの影響を受けません。

有効にすると、この機能は server-time-zone に基づいて MySQL TIMESTAMP データを DATETIME 型に変換します。

include-comments.enabled

テーブルコメントとフィールドコメントを同期するかどうかを指定します。

いいえ

ブール値

false

有効な値:

  • true:テーブルコメントとフィールドコメントを同期します。

  • false (デフォルト):テーブルコメントとフィールドコメントを同期しません。

この機能を有効にすると、ジョブのメモリ使用量が増加します。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、境界のないシャードを最初に分散するかどうかを指定します。

いいえ

ブール

有効な値:

  • true:スナップショット読み取りフェーズ中に、境界のないシャードを最初に分散します。

  • false (デフォルト):スナップショット読み取りフェーズ中に、境界のないシャードを最初に分散しません。

これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のシャードを同期する際に Out-of-Memory (OOM) エラーが発生する脅威を軽減できます。ジョブを初めて開始する前に、このパラメーターを追加してください。

説明

この機能は、Flink コンピュートエンジン VVR 11.1 以降でのみ利用可能です。

binlog.session.network.timeout

バイナリロギング接続のネットワークタイムアウト期間。

いいえ

期間

10分

このパラメーターを 0 s に設定すると、MySQL のデフォルトのサーバー側タイムアウト期間が使用されます。

説明

この機能は、Flink コンピュートエンジン VVR 11.5 以降でのみ利用可能です。

scan.rate-limit.records-per-second

ソースが毎秒送信できるレコードの最大数です。

いいえ

ロング

なし

このパラメーターは、データ読み取りを制限する必要があるシナリオに適用されます。この制限は、完全フェーズと増分フェーズの両方で有効です。

ソースの numRecordsOutPerSecond メトリックは、データストリーム全体が出力する 1 秒あたりのレコード数を反映します。このメトリックを使用してパラメーターを調整します。

完全読み取りフェーズでは、各バッチで読み取るレコード数も減らします。これを行うには、scan.incremental.snapshot.chunk.size パラメーターの値を減らします。

説明

この機能は、Flink コンピュートエンジン VVR 11.5 以降でのみ利用可能です。

型マッピング

次の表に、データインジェストの型マッピングを示します。

MySQL CDC フィールド型

CDC フィールド型

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

treat-timestamp-as-datetime-enabled パラメーターの値に応じて、異なるマッピングフィールドが使用されます:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

STRING

説明

MySQL は最大 65 の 10 進数精度をサポートします。Flink は 10 進数精度を 38 に制限します。10 進数列の精度が 38 を超える場合、精度損失を防ぐために文字列にマッピングします。

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

説明

JSON データ型は Flink で JSON 形式の文字列に変換されます。

GEOMETRY

STRING

説明

MySQL 空間データ型は、固定 JSON 形式の文字列に変換されます。詳細については、MySQL 空間データ型マッピングをご参照ください。

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

説明

MySQL BLOB データ型の場合、サポートされる最大長は 2,147,483,647 (2**31 - 1) です。

BLOB

MEDIUMBLOB

LONGBLOB

  • CDC ソーステーブル

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • シンクテーブル

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • データインジェストソース

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

MySQL CDC ソーステーブルについて

  • 仕組み

    MySQL CDC ソーステーブルが起動すると、テーブル全体をスキャンし、プライマリキーに基づいて複数のチャンクに分割し、現在の binlog 位置を記録します。次に、増分スナップショットアルゴリズムを使用して、SELECT 文で各チャンクからデータを読み取ります。ジョブは定期的にチェックポイントを実行して、完了したチャンクを記録します。フェールオーバーが発生した場合、ジョブは未完了のチャンクから読み取りを続行します。すべてのチャンクが読み取られた後、ジョブは以前に記録された binlog 位置から増分変更レコードの読み取りを開始します。Flink ジョブは定期的にチェックポイントを実行し続けて binlog 位置を記録します。ジョブが失敗した場合、最後に記録された binlog 位置から処理を再開して、exactly-once セマンティクスを実現します。

    増分スナップショットアルゴリズムの詳細な説明については、「MySQL CDC コネクタ」をご参照ください。

  • メタデータ

    メタデータは、シャード化されたデータベースとテーブルをマージするのに役立ちます。マージ後、データの各行のソースデータベースとテーブルを特定する必要がある場合があります。メタデータ列を使用すると、この情報にアクセスでき、複数のシャード化されたテーブルを単一の宛先テーブルに簡単にマージできます。

    MySQL CDC ソースはメタデータ列の構文をサポートしています。メタデータ列を介して次のメタデータにアクセスできます。

    メタデータキー

    メタデータ型

    説明

    database_name

    STRING NOT NULL

    行を含むデータベースの名前。

    table_name

    STRING NOT NULL

    行を含むテーブルの名前。

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    データベースで行が変更された時刻。レコードが binlog ではなくテーブルの既存の既存データからのものである場合、この値は常に 0 です。

    op_type

    STRING NOT NULL

    行の変更タイプ。

    • +I:INSERT メッセージ

    • -D:DELETE メッセージ

    • -U:UPDATE_BEFORE メッセージ

    • +U:UPDATE_AFTER メッセージ

    説明

    VVR 8.0.7 以降でのみサポートされます。

    query_log

    STRING NOT NULL

    読み取られた行に対応する MySQL クエリログレコード。

    説明

    クエリをログに記録するには、MySQL で binlog_rows_query_log_events パラメーターを有効にする必要があります。

    次の例は、MySQL インスタンス内の異なるシャードデータベースから複数の orders テーブルをマージし、それらを Hologres の holo_orders テーブルに同期する方法を示しています。

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- データベース名を読み取る
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- テーブル名を読み取る
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更タイムスタンプを読み取る
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取る
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- 正規表現を使用して複数のシャードデータベースを照合
      'table-name' = 'orders_.*'   -- 正規表現を使用して複数のシャードテーブルを照合
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    上記のコードに基づいて、WITH 句で scan.read-changelog-as-append-only.enabled オプションを true に設定した場合、出力はダウンストリームテーブルのプライマリキー設定によって異なります:

    • 下流テーブルのプライマリキーが order_id の場合、出力にはソーステーブルの各プライマリキーの最後の変更のみが含まれます。たとえば、プライマリキーの最後の変更が削除操作だった場合、同じプライマリキーと op_type が -D のレコードが下流テーブルに表示されます。

    • ダウンストリームテーブルのプライマリキーが order_id、operation_ts、op_type の複合である場合、出力にはソーステーブルの各プライマリキーの完全な変更履歴が含まれます。

  • 正規表現のサポート

    MySQL CDC ソーステーブルは、テーブル名またはデータベース名に正規表現を使用して、複数のテーブルまたはデータベースを照合することをサポートしています。次の例は、正規表現を使用して複数のテーブルを指定する方法を示しています。

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正規表現を使用して複数のデータベースを照合
      'table-name' = '(t[5-8]|tt)' -- 正規表現を使用して複数のテーブルを照合
    );

    前述の例の正規表現の説明:

    • ^(test).* はプレフィックスマッチの例です。この式は、test1 や test2 のように test で始まるデータベース名に一致します。

    • .*[p$] はサフィックスマッチの例です。この式は、cdcp や edcp のように p で終わるデータベース名に一致します。

    • txc は特定のマッチです。txc のような特定のデータベース名に一致します。

    完全修飾テーブル名を照合する場合、MySQL CDC はデータベース名とテーブル名の両方を使用してテーブルを一意に識別します。照合には database-name.table-name のパターンを使用します。たとえば、パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) は、データベース内の txc.tt や test2.test5 のようなテーブルに一致します。

    重要

    SQL ジョブ設定では、table-name および database-name オプションは、カンマ (,) を使用して複数のテーブルまたはデータベースを指定することをサポートしていません。

    • 複数のテーブルを照合したり、複数の正規表現を使用したりするには、それらを縦棒 (|) で接続し、括弧で囲みます。たとえば、user テーブルと product テーブルを読み取るには、table-name を (user|product) と設定できます。

    • 正規表現にカンマが含まれている場合は、縦棒 (|) 演算子を使用して書き換える必要があります。たとえば、正規表現 mytable_\d{1, 2} は、カンマを使用しないように同等の (mytable_\d{1}|mytable_\d{2}) に書き換える必要があります。

  • 同時実行制御

    MySQL コネクタは、完全データの同時読み取りをサポートしており、データロード効率を向上させます。Realtime Compute for Apache Flink コンソールの Autopilot と組み合わせることで、同時読み取りが完了した後の増分フェーズで自動的にスケールインし、コンピューティングリソースを節約できます。

    Realtime Compute for Apache Flink 開発コンソールでは、基本モードまたはエキスパートモードのリソース設定ページでジョブの並列度を設定できます。違いは次のとおりです:

    • 基本モードで設定された並列度は、ジョブ全体のグローバルな並列度です。基础模式

    • エキスパートモードでは、必要に応じて特定の VERTEX の並列度を設定できます。vertex并发

    リソース設定の詳細については、「ジョブデプロイメントの設定」をご参照ください。

    重要

    基本モードまたはエキスパートモードのいずれを使用する場合でも、テーブルで宣言された server-id の範囲は、ジョブの並列度以上でなければなりません。たとえば、server-id の範囲が 5404-5412 の場合、範囲には 9 つの一意のサーバー ID が含まれ、最大ジョブ並列度は 9 になります。同じ MySQL インスタンスに対する異なるジョブは、重複しない server-id 範囲を持つ必要があり、つまり、各ジョブは一意で明示的に構成された server-id を持つ必要があります。

  • Autopilot 自動スケーリング

    完全データフェーズでは、大量の既存データが蓄積されます。読み取り効率を向上させるために、既存データは通常、同時に読み取られます。しかし、増分 binlog フェーズでは、binlog データの量が少なく、グローバルな順序を維持する必要があるため、通常は単一の並列度で十分です。Autopilot は、パフォーマンスとリソースのバランスを自動的に調整して、完全フェーズと増分フェーズのこれらの異なる要件を満たすことができます。

    Autopilot は、MySQL CDC ソースの各タスクのトラフィックを監視します。ジョブが binlog フェーズに入ると、1 つのタスクのみが binlog の読み取りを担当し、他のタスクがアイドル状態の場合、Autopilot はソースの CU 数と並列度を自動的に削減します。Autopilot を有効にするには、ジョブの O&M ページで Autopilot モードをアクティブに設定します。

    説明

    並列度を削減するための最小トリガー間隔は、デフォルトで 24 時間です。Autopilot のオプションと詳細については、「Autopilot の設定」をご参照ください。

  • 起動モード

    scan.startup.mode オプションを使用して、MySQL CDC ソーステーブルの起動モードを指定できます。オプションには次のものがあります:

    • initial (デフォルト):最初の起動時にデータベーステーブルの完全読み取りを実行し、その後増分モードに切り替えて binlog を読み取ります。

    • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古い binlog 位置から読み取りを開始します。

    • latest-offset:スナップショットフェーズをスキップし、binlog の末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの開始後に発生したデータ変更のみを読み取ることができます。

    • specific-offset:スナップショットフェーズをスキップし、指定された binlog 位置から読み取りを開始します。位置は、binlog ファイル名と位置、または GTID セットを使用して指定できます。

    • timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプから binlog イベントの読み取りを開始します。

    例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始
        'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始
        'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始
        'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードの binlog ファイル名を指定
        'scan.startup.specific-offset.pos' = '4', -- specific-offset モードの binlog 位置を指定
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードの GTID セットを指定
        'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードの起動タイムスタンプを指定
        ...
    )
    重要
    • MySQL ソースは、チェックポイント中に現在の位置を INFO レベルでログに記録します。ログのプレフィックスは Binlog offset on checkpoint {checkpoint-id} です。このログは、特定のチェックポイント位置からジョブを再起動するのに役立ちます。

    • 読み取られるテーブルのスキーマが過去に変更されている場合、earliest-offset、specific-offset、または timestamp から開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のスキーマを保存しており、スキーマが一致しない古いデータを正しく解析できないためです。

  • キーなし CDC ソーステーブルについて

    • キーレステーブルを使用するには、scan.incremental.snapshot.chunk.key-column を設定し、非ヌル列を指定する必要があります。

    • キーなし CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定された列の動作によって決まります:

      • 指定された列が更新されない場合、厳密に 1 回のセマンティクスが保証されます。

      • 指定された列が更新される場合、at-least-once セマンティクスのみが保証されます。ただし、下流システムと組み合わせ、下流のプライマリキーを指定し、べき等操作を使用することで、データの正確性を保証できます。

  • ApsaraDB RDS for MySQL からのバックアップログの読み取り

    MySQL CDC ソーステーブルは、ApsaraDB RDS for MySQL からのバックアップログの読み取りをサポートしています。これは、完全スナップショットフェーズに時間がかかり、ローカルの binlog ファイルが読み取られる前にクリーンアップされてしまう場合に役立ちます。バックアップファイルが利用可能な場合、コネクタはそこから読み取ることができます。

    例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • CDC ソースの再利用を有効にする

    同じジョブ内で、複数の MySQL CDC ソーステーブルが複数の binlog クライアントを起動します。すべてのソーステーブルが同じインスタンスにある場合、これはデータベースの負荷を増加させます。詳細については、「MySQL CDC FAQ」をご参照ください。

    ソリューション

    VVR 8.0.7 以降は、MySQL CDC ソースの再利用をサポートしています。この機能は、適格な MySQL CDC ソーステーブルをマージします。ソーステーブルは、データベース名、テーブル名、および server-id を除き、構成項目が同一である場合にマージの対象となります。エンジンは、同じジョブ内の MySQL CDC ソースを自動的にマージします。

    手順

    1. SQL ジョブで SET コマンドを使用します:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (VVR 8.0.8 および 8.0.9 の場合) さらにこの項目を設定します:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 以降では、再利用はデフォルトで有効になっています。
    2. 状態なしでジョブを開始します。ソースの再利用構成項目を変更すると、ジョブのトポロジーが変更されます。状態なしでジョブを開始する必要があります。そうしないと、ジョブの開始に失敗したり、データが失われたりする可能性があります。ソースがマージされると、MergetableSourceScan ノードが表示されます。

    重要
    • 再利用を有効にした後、演算子チェーンを無効にすることは推奨されません。pipeline.operator-chainingfalse に設定すると、データのシリアル化と逆シリアル化のオーバーヘッドが増加します。マージされるソースが多いほど、オーバーヘッドは大きくなります。

    • VVR 8.0.7 では、演算子チェーンを無効にするとシリアル化の問題が発生します。

binlog 読み取りの高速化

MySQL コネクタをソーステーブルまたはデータインジェストソースとして使用する場合、増分フェーズ中に binlog ファイルを解析してさまざまな変更メッセージを生成します。binlog ファイルは、すべてのテーブルの変更をバイナリ形式で記録します。次の方法で binlog ファイルの解析を高速化できます。

  • 解析フィルターを有効にする

    • scan.only.deserialize.captured.tables.changelog.enabled オプションを使用して、指定されたテーブルの変更イベントのみを解析します。

  • Debezium オプションを最適化する

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size:ブロッキングキューが保持できる最大レコード数。Debezium がデータベースからイベントストリームを読み取るとき、イベントをダウンストリームに書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。

    • debezium.max.batch.size:コネクタが各反復で処理する最大イベント数。デフォルト値は 2048 です。

    • debezium.poll.interval.ms:コネクタが新しい変更イベントを要求する前に待機するミリ秒数。デフォルト値は 1000 ミリ秒、つまり 1 秒です。

例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium 設定
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 解析フィルターを有効にする
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 指定されたテーブルの変更イベントのみを解析
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium 設定
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # 解析フィルターを有効にする
  scan.only.deserialize.captured.tables.changelog.enabled: true

MySQL CDC の Enterprise Edition は、85 MB/s の binlog 消費能力を持ち、これはオープンソースコミュニティバージョンの約 2 倍です。binlog の生成速度が 85 MB/s (512 MB のファイルを 6 秒ごとに 1 つに相当) を超えると、Flink ジョブの遅延は増加し続けます。binlog の生成速度が遅くなると、処理遅延は徐々に減少します。binlog ファイルに大きなトランザクションが含まれている場合、処理遅延は一時的に増加し、トランザクションログが読み取られた後に減少する可能性があります。

MySQL CDC DataStream API

重要

DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法については、「DataStream プログラムでのコネクタの統合と使用」をご参照ください。

次の例は、DataStream API プログラムを作成し、MySqlSource を使用する方法を示しています。必要な pom 依存関係も含まれています。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // キャプチャするデータベースを設定
        .tableList("yourDatabaseName.yourTableName") // キャプチャするテーブルを設定
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // SourceRecord を JSON 文字列に変換
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // チェックポイントを有効にする
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // 4 つの並列ソースタスクを設定
      .setParallelism(4)
      .print().setParallelism(1); // メッセージの順序を維持するためにシンクの並列度を 1 に設定
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

MySqlSource をビルドする際には、コード内で次のパラメーターを指定する必要があります:

パラメーター

説明

hostname

MySQL データベースの IP アドレスまたはホスト名。

port

MySQL データベースサービスのポート番号。

databaseList

MySQL データベースの名前。

説明

データベース名は、複数のデータベースからデータを読み取るために正規表現をサポートしています。.* を使用してすべてのデータベースを照合します。

username

MySQL データベースサービスのユーザー名。

password

MySQL データベースサービスのパスワード。

deserializer

SourceRecord 型のレコードを指定された型に逆シリアル化するデシリアライザ。パラメーターは次のいずれかの値に設定できます:

  • RowDataDebeziumDeserializeSchema:SourceRecord を RowData に変換します。これは Flink Table または SQL の内部データ構造です。

  • JsonDebeziumDeserializationSchema:SourceRecord を JSON 形式の文字列に変換します。

pom 依存関係で次のパラメーターを指定する必要があります:

${vvr.version}

Realtime Compute for Apache Flink エンジンのバージョン。例:1.17-vvr-8.0.4-3

説明

Maven に表示されるバージョン番号を使用してください。定期的に修正プログラムバージョンをリリースしており、これらの更新は他のチャネルを通じて発表されない場合があります。

${flink.version}

Apache Flink のバージョン。例:1.17.2

重要

ジョブの実行時に互換性の問題が発生しないように、Apache Flink のバージョンが Realtime Compute for Apache Flink のエンジンバージョンと一致していることを確認してください。バージョンマッピングの詳細については、「エンジン」をご参照ください。

よくある質問

CDC ソーステーブルを使用する際に発生する可能性のある問題については、「CDC の問題」をご参照ください。