すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MySQL

最終更新日:Dec 16, 2025

このトピックでは、MySQL コネクタの使用方法について説明します。

背景情報

MySQL コネクタは、ApsaraDB RDS for MySQL、PolarDB for MySQL、OceanBase (MySQL モード)、セルフマネージド MySQL など、MySQL プロトコルと互換性のあるすべてのデータベースをサポートします。

重要

MySQL コネクタを使用して OceanBase からデータを読み取る場合は、バイナリログ (Binlog) が有効になっており、正しく構成されていることを確認してください。詳細については、「Binlog 関連の操作」をご参照ください。この機能はパブリックプレビュー段階です。使用する前に十分に評価することを推奨します。

MySQL コネクタは、次の機能をサポートしています。

カテゴリ

詳細

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェストのデータソース

ランタイムモード

ストリーミングモードのみサポートされています。

データフォーマット

該当なし

特定のモニタリングメトリック

監視メトリック

  • ソーステーブル

    • currentFetchEventTimeLag:データが生成されてから Source オペレーターにプルされるまでの間隔。

      このメトリックは、バイナリログ (Binlog) フェーズでのみ有効です。スナップショットフェーズでは、この値は常に 0 です。

    • currentEmitEventTimeLag:データが生成されてから Source オペレーターを離れるまでの間隔。

      このメトリックは、Binlog フェーズでのみ有効です。スナップショットフェーズでは、この値は常に 0 です。

    • sourceIdleTime:ソーステーブルがアイドル状態であった期間。

  • ディメンションテーブルと結果テーブル:なし。

説明

メトリックの詳細については、「監視メトリック」をご参照ください。

API タイプ

DataStream、SQL、およびデータインジェスト YAML

結果テーブルのデータの更新または削除をサポート

はい

特徴

MySQL 変更データキャプチャ (CDC) ソーステーブルは、最初にデータベースから完全な既存データを読み取るストリーミングソーステーブルです。その後、スムーズにバイナリログ (Binlog) の読み取りに切り替えることで、データの欠落や重複がないことを保証します。障害が発生した場合でも、Exactly-Once セマンティクスが保証されます。MySQL CDC ソーステーブルは、完全データの同時読み取りをサポートし、増分スナップショットアルゴリズムを使用して、ロックフリーの読み取りと再開可能なデータ転送を実装します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。

  • バッチ処理とストリーム処理の統合:コネクタは完全データと増分データの両方の読み取りをサポートしているため、個別のパイプラインは不要です。

  • 水平方向のパフォーマンススケーリングのために、完全データの同時読み取りをサポートします。

  • 完全データの読み取りから増分データの読み取りにシームレスに切り替え、自動的にスケールインして計算リソースを節約します。

  • 完全データ読み取りフェーズでの再開可能なデータ転送をサポートし、安定性を向上させます。

  • 完全データのロックフリー読み取りは、オンラインのビジネス運用に影響を与えません。

  • ApsaraDB RDS for MySQL からのバックアップログの読み取りをサポートします。

  • バイナリログファイルを並列で解析し、データ遅延を低減します。

前提条件

MySQL CDC ソーステーブルを使用する前に、「MySQL の構成」で説明されているように MySQL を構成して、前提条件を満たす必要があります。

RDS for MySQL

  • Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。

  • 必須の MySQL バージョン:5.6、5.7、および 8.0.x。

  • バイナリログ (Binlog) を有効にする必要があります。デフォルトで有効になっています。

  • バイナリログのフォーマットは ROW である必要があります。これはデフォルトのフォーマットです。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。

  • MySQL データベースとテーブルを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。

  • IP ホワイトリストを構成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのホワイトリストの構成」をご参照ください。

PolarDB for MySQL

  • Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。

  • 必須の MySQL バージョン:5.6、5.7、および 8.0.x。

  • バイナリログ (Binlog) を有効にする必要があります。デフォルトでは無効になっています。

  • バイナリログのフォーマットは ROW である必要があります。これはデフォルトのフォーマットです。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。

  • MySQL データベースとテーブルを作成します。詳細については、「PolarDB for MySQL クラスターのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。

  • IP ホワイトリストを構成します。詳細については、「PolarDB for MySQL クラスターのホワイトリストの構成」をご参照ください。

自己管理 MySQL

  • Realtime Compute for Apache Flink を使用して接続テストを実行し、ネットワーク接続を確認できます。

  • 必須の MySQL バージョン:5.6、5.7、および 8.0.x。

  • バイナリログ (Binlog) を有効にする必要があります。デフォルトでは無効になっています。

  • バイナリログのフォーマットは ROW である必要があります。デフォルトのフォーマットは STATEMENT です。

  • binlog_row_image を FULL に設定します。これはデフォルトの設定です。

  • バイナリログトランザクション圧縮を無効にします。この機能は MySQL 8.0.20 で導入され、デフォルトで無効になっています。

  • MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与します。

  • MySQL データベースとテーブルを作成します。詳細については、「自己管理型 MySQL インスタンスのデータベースとアカウントの作成」をご参照ください。権限不足による操作の失敗を防ぐために、特権アカウントを使用して MySQL データベースを作成することを推奨します。

  • IP ホワイトリストを構成します。詳細については、「自己管理型 MySQL インスタンスのホワイトリストの構成」をご参照ください。

制限事項

一般的な制限事項

  • MySQL CDC ソーステーブルはウォーターマーク定義をサポートしていません。

  • Create Table As Select (CTAS) および Create Database As Select (CDAS) ジョブでは、MySQL CDC ソーステーブルは一部のスキーマ変更を同期できます。サポートされている変更タイプの詳細については、「スキーマ進化の同期ポリシー」をご参照ください。

  • MySQL CDC コネクタは、バイナリログトランザクション圧縮機能をサポートしていません。したがって、MySQL CDC コネクタを使用して増分データを消費する場合は、この機能が無効になっていることを確認してください。そうしないと、増分データを取得できなくなる可能性があります。

RDS for MySQL の制限事項

  • RDS for MySQL の場合、セカンダリデータベースまたは読み取り専用レプリカからデータを読み取ることは推奨されません。これは、これらのインスタンス上のバイナリログのデフォルトの保持期間が非常に短いためです。バイナリログが期限切れになりクリーンアップされると、ジョブはバイナリログデータの消費に失敗し、エラーを報告します。

  • RDS for MySQL は、デフォルトでプライマリ/セカンダリ並列レプリケーションを有効にしますが、プライマリデータベースとセカンダリデータベース間のトランザクション順序の一貫性を保証しません。これにより、プライマリ/セカンダリ切り替え後のチェックポイントからのデータ回復中にデータが失われる可能性があります。この問題を回避するには、RDS for MySQL の slave_preserve_commit_order オプションを手動で有効にすることができます。

PolarDB for MySQL の制限事項

MySQL CDC ソーステーブルは、PolarDB for MySQL 1.0.19 以前のマルチマスタークラスターアーキテクチャ (詳細については、「マルチマスタークラスターとは」をご参照ください) からのデータ読み取りをサポートしていません。これらのクラスターバージョンによって生成されたバイナリログには、重複したテーブル ID が含まれている可能性があります。これにより、CDC ソーステーブルでスキーマ マッピングエラーが発生し、バイナリログデータの解析時にエラーが発生する可能性があります。

オープンソース MySQL の制限事項

デフォルトの構成では、MySQL はプライマリ-セカンダリ間のバイナリログレプリケーション中にトランザクションの順序を維持します。MySQL レプリカで並列レプリケーションが有効 (slave_parallel_workers > 1) になっているが、slave_preserve_commit_order=ON になっていない場合、そのトランザクションのコミット順序はプライマリデータベースと一致しない可能性があります。Flink CDC がチェックポイントから回復する際、順序が正しくないためにデータが欠落する可能性があります。MySQL レプリカで slave_preserve_commit_order = ON を設定することを推奨します。または、slave_parallel_workers = 1 を設定することもできますが、これによりレプリケーションのパフォーマンスが犠牲になります。

注意

  • 各 MySQL CDC データソースには、明示的に異なるサーバー ID を構成する必要があります

    サーバー ID の目的

    複数の MySQL CDC データソースが同じサーバー ID を共有し、再利用できない場合、バイナリログのオフセットが乱れる可能性があります。これにより、データが複数回読み取られたり、欠落したりする可能性があります。

    異なるシナリオでのサーバー ID の構成

    DDL でサーバー ID を指定できますが、DDL パラメーターではなく動的ヒントを使用して構成することを推奨します。

    • 並列度 = 1 または増分スナップショットが無効

      ## 増分スナップショットフレームワークが無効になっているか、並列度が 1 の場合、特定のサーバー ID を指定できます。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • 並列度 > 1 かつ増分スナップショットが有効

      ## サーバー ID の範囲を指定する必要があります。範囲内の利用可能なサーバー ID の数は、並列度以上でなければなりません。並列度が 3 であると仮定します。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • CTAS を使用したデータ同期

      CTAS を使用してデータ同期を行う場合、CDC データソースの構成が同じであれば、自動的に再利用されます。この場合、複数の CDC データソースに同じサーバー ID を構成できます。詳細については、「例 4:複数の CTAS 文」をご参照ください。

    • 再利用できない複数の非 CTAS ソーステーブル

      ジョブに複数の MySQL CDC ソーステーブルが含まれており、同期に CTAS 文を使用しない場合、データソースは再利用できません。各 CDC ソーステーブルに異なるサーバー ID を提供する必要があります。同様に、増分スナップショットフレームワークが有効で、並列度が 1 より大きい場合は、サーバー ID の範囲を指定する必要があります。

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • 結果テーブル

    • DDL で自動インクリメントプライマリキーを宣言しないでください。MySQL はデータ書き込み時に自動的にそれらを埋めます。

    • 少なくとも 1 つの非プライマリキーフィールドを宣言する必要があります。そうしないと、エラーが報告されます。

    • DDL では、NOT ENFORCED は Flink がプライマリキー制約を強制しないことを意味します。プライマリキーの正確性と完全性を保証する必要があります。詳細については、「有効性チェック」をご参照ください。

  • ディメンションテーブル

    インデックスを使用してクエリを高速化したい場合、JOIN 条件のフィールドの順序は、インデックスで定義された順序 (左端プレフィックスルール) と一致する必要があります。たとえば、インデックスが (a, b, c) の場合、JOIN 条件は ON t.a = x AND t.b = y となります。

    Flink によって生成された SQL はオプティマイザーによって再書き込みされる可能性があり、これにより実際のデータベースクエリ中にインデックスが使用されなくなることがあります。インデックスが使用されているかどうかを確認するには、実行計画 (EXPLAIN) または MySQL の Redis スローログをチェックして、実際に実行される SELECT 文を表示できます。

SQL

SQL ジョブで MySQL コネクタをソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。

構文

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

説明
  • コネクタが結果テーブルに書き込む方法:結果テーブルに書き込む際、コネクタは受信した各データレコードに対して SQL 文を構築して実行します。実行される具体的な SQL 文は次のとおりです:

    • プライマリキーのない結果テーブルの場合、INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...); 文が実行されます。

    • プライマリキーのある結果テーブルの場合、INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; 文が実行されます。注意:物理テーブルにプライマリキー以外の一意なインデックス制約がある場合、異なるプライマリキーを持つが同じ一意なインデックス値を持つ 2 つのレコードを挿入すると、一意なインデックスの競合が発生します。この競合により、データが上書きされて失われる可能性があります。

  • MySQL データベースで自動インクリメントプライマリキーが定義されている場合、Flink DDL で自動インクリメントフィールドを宣言しないでください。データベースはデータ書き込み中にこのフィールドを自動的に埋めます。コネクタは、自動インクリメントフィールドを持つデータの書き込みと削除のみをサポートし、更新はサポートしていません。

WITH パラメーター

  • 一般

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    connector

    テーブルタイプ。

    はい

    STRING

    なし

    ソーステーブルとして使用する場合、これを mysql-cdc または mysql に設定できます。2 つの値は同等です。ディメンションテーブルまたは結果テーブルとして使用する場合、値は mysql でなければなりません。

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    はい

    STRING

    なし

    Virtual Private Cloud (VPC) アドレスを入力することを推奨します。

    説明

    MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ストレージ管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。

    username

    MySQL データベースサービスのユーザー名。

    はい

    STRING

    なし

    なし。

    password

    MySQL データベースサービスのパスワード。

    はい

    STRING

    なし

    なし。

    database-name

    MySQL データベース名。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、データベース名は正規表現をサポートし、複数のデータベースからデータを読み取ることができます。

    • 正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号の使用は避けてください。具体的な理由については、table-name の備考をご参照ください。

    table-name

    MySQL テーブル名。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、テーブル名は正規表現をサポートし、複数のテーブルからデータを読み取ることができます。

      複数の MySQL テーブルから読み取る場合、複数の Binlog リスナーを有効にすることを避けるために、複数の CTAS 文を単一のジョブとして送信することで、パフォーマンスと効率が向上します。詳細については、「複数の CTAS 文:単一のジョブとして送信」をご参照ください。

    • 正規表現を使用する場合、文字列の開始と終了を一致させるために ^ および $ 記号の使用は避けてください。理由は以下で説明します。

    説明

    正規表現でテーブル名を照合する場合、MySQL CDC ソーステーブルは、指定された database-nametable-name\\. 文字列 (または Ververica Runtime (VVR) 8.0.1 より前のバージョンでは . 文字) で連結して、完全修飾正規表現を形成します。この式は、MySQL データベース内のテーブルの完全修飾名と照合するために使用されます。

    たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' と構成した場合、コネクタは正規表現 db_.*\\.tb_.+ (またはバージョン 8.0.1 より前では db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、どのテーブルを読み取るかを決定します。

    port

    MySQL データベースサービスのポート番号。

    いいえ

    INTEGER

    3306

    なし。

  • ソーステーブル固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    server-id

    データベースクライアントの数値 ID。

    いいえ

    STRING

    5400 から 6400 の間のランダムな値が生成されます。

    この ID は MySQL クラスター内でグローバルに一意でなければなりません。同じデータベースに接続する各ジョブに異なる ID を設定することを推奨します。

    このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、複数の同時リーダーがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。

    scan.incremental.snapshot.enabled

    増分スナップショットを有効にするかどうかを指定します。

    いいえ

    BOOLEAN

    true

    増分スナップショットはデフォルトで有効になっています。これは、完全なデータスナップショットを読み取るための新しいメカニズムです。古いスナップショット読み取り方法と比較して、増分スナップショットにはいくつかの利点があります:

    • ソースは完全なデータを並行して読み取ることができます。

    • ソースは完全なデータを読み取る際にチャンクレベルのチェックポイントをサポートします。

    • ソースは完全なデータを読み取る際にグローバル読み取りロック (FLUSH TABLES WITH read lock) を取得する必要がありません。

    ソースが同時読み取りをサポートするようにしたい場合、各同時リーダーには一意のサーバー ID が必要です。したがって、server-id は 5400-6400 のような範囲でなければならず、範囲のサイズは並列度以上でなければなりません。

    説明

    この設定項目は Flink コンピュートエンジン VVR 11.1 以降のバージョンでは削除されています。

    scan.incremental.snapshot.chunk.size

    各チャンクのサイズ (行数)。

    いいえ

    INTEGER

    8096

    増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータは、完全に読み取られるまでメモリにキャッシュされます。

    チャンクあたりの行数が少ないと、テーブル内のチャンクの総数が多くなります。これにより、障害回復の粒度は向上しますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて合理的なチャンクサイズを設定する必要があります。

    scan.snapshot.fetch.size

    テーブルの完全なデータを読み取る際に一度にフェッチする最大レコード数。

    いいえ

    INTEGER

    1024

    なし。

    scan.startup.mode

    データ消費の起動モード。

    いいえ

    STRING

    initial

    有効な値:

    • initial (デフォルト):最初の起動時に、完全な既存データをスキャンし、その後最新の Binlog データを読み取ります。

    • latest-offset:最初の起動時に、既存データをスキャンせず、Binlog の末尾 (最新の位置) から読み取りを開始します。コネクタの起動後に行われた変更のみを読み取ります。

    • earliest-offset:既存データをスキャンせず、利用可能な最も古い Binlog から読み取りを開始します。

    • specific-offset:既存データをスキャンせず、指定された Binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方を構成してオフセットを指定するか、scan.startup.specific-offset.gtid-set のみを構成して GTID セットを指定できます。

    • timestamp:既存データをスキャンせず、指定されたタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。

    重要

    earliest-offsetspecific-offset、または timestamp 起動モードを使用する場合、スキーマの不一致によるエラーを避けるために、指定された Binlog 消費位置とジョブの起動時間の間でテーブルスキーマが変更されないことを確認してください。

    scan.startup.specific-offset.file

    特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。

    いいえ

    STRING

    なし

    この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の例:mysql-bin.000003

    scan.startup.specific-offset.pos

    特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。

    いいえ

    INTEGER

    なし

    この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

    scan.startup.specific-offset.gtid-set

    特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。

    いいえ

    STRING

    なし

    この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。

    いいえ

    LONG

    なし

    この構成を使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒単位です。

    重要

    特定の時間を使用する場合、MySQL CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリーンアップされておらず、読み取り可能であることを確認してください。

    server-time-zone

    データベースが使用するセッションタイムゾーン。

    いいえ

    STRING

    このパラメーターを指定しない場合、システムは Flink ジョブランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

    例:Asia/Shanghai。このパラメーターは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。

    debezium.min.row.count.to.stream.results

    テーブルの行数がこの値を超えると、バッチ読み取りモードが使用されます。

    いいえ

    INTEGER

    1000

    Flink は、MySQL ソーステーブルから次のいずれかの方法でデータを読み取ります:

    • 完全読み取り:テーブルデータ全体を直接メモリに読み取ります。これは高速ですが、対応する量のメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーのリスクがあります。

    • バッチ読み取り:データを複数のバッチで読み取り、すべてのデータが読み取られるまで毎回一定数の行をフェッチします。これにより、大きなテーブルでの OOM リスクを回避できますが、比較的低速です。

    connect.timeout

    MySQL データベースサーバーへの接続がタイムアウトするまで待機する最大時間 (再試行前)。

    いいえ

    DURATION

    30s

    なし。

    connect.max-retries

    MySQL データベースサービスへの接続に失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    connection.pool.size

    データベース接続プールのサイズ。

    いいえ

    INTEGER

    20

    データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。

    jdbc.properties.*

    JDBC URL のカスタム接続パラメーター。

    いいえ

    STRING

    なし

    カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と構成できます。

    サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。

    debezium.*

    Binlog を読み取るための Debezium のカスタムパラメーター。

    いいえ

    STRING

    なし

    カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

    heartbeat.interval

    ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。

    いいえ

    DURATION

    30s

    ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。期限切れの Binlog オフセットはジョブの失敗を引き起こし、ステートレスな再起動が必要になります。

    scan.incremental.snapshot.chunk.key-column

    スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。

    備考を参照

    STRING

    なし

    • プライマリキーのないテーブルには必須です。選択された列は非ヌル型 (NOT NULL) である必要があります。

    • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

    rds.region-id

    Alibaba Cloud RDS for MySQL インスタンスのリージョン ID。

    OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

    STRING

    なし

    リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

    rds.access-key-id

    Alibaba Cloud RDS for MySQL アカウントの AccessKey ID。

    OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

    STRING

    なし

    詳細については、「AccessKey 情報を表示する方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey ID を指定することを推奨します。詳細については、「変数管理」をご参照ください。

    rds.access-key-secret

    Alibaba Cloud RDS for MySQL アカウントの AccessKey Secret。

    OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

    STRING

    なし

    詳細については、「AccessKey 情報を表示する方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey Secret を指定することを推奨します。詳細については、「変数管理」をご参照ください。

    rds.db-instance-id

    Alibaba Cloud RDS for MySQL インスタンスのインスタンス ID。

    OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

    STRING

    なし

    なし。

    rds.main-db-id

    Alibaba Cloud RDS for MySQL インスタンスのプライマリデータベース ID。

    いいえ

    STRING

    なし

    • プライマリデータベース ID の取得方法の詳細については、「RDS for MySQL のログバックアップ」をご参照ください。

    • Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。

    rds.download.timeout

    OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。

    いいえ

    DURATION

    60s

    なし。

    rds.endpoint

    OSS Binlog 情報を取得するためのエンドポイント。

    いいえ

    STRING

    なし

    • 有効な値の詳細については、「エンドポイント」をご参照ください。

    • Flink コンピュートエンジン VVR 8.0.8 以降のバージョンでのみサポートされています。

    scan.incremental.close-idle-reader.enabled

    スナップショットフェーズが終了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    • Flink コンピュートエンジン VVR 8.0.1 以降のバージョンでのみサポートされています。

    • この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。

    scan.read-changelog-as-append-only.enabled

    変更ログデータストリームを追加専用データストリームに変換するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: すべてのタイプのメッセージ (INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含む) が INSERT メッセージに変換されます。上流テーブルからの削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にします。

    • false (デフォルト):すべてのメッセージタイプがそのまま下流に送信されます。

    説明

    Flink コンピュートエンジン VVR 8.0.8 以降のバージョンでのみサポートされています。

    scan.only.deserialize.captured.tables.changelog.enabled

    増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

    いいえ

    BOOLEAN

    • VVR 8.x バージョンではデフォルト値は false です。

    • VVR 11.1 以降のバージョンではデフォルト値は true です。

    有効な値:

    • true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

    • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

    説明
    • Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。

    • Flink コンピュートエンジン VVR 8.0.8 以前で使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更する必要があります。

    scan.parse.online.schema.changes.enabled

    増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:RDS のロックレス変更 DDL イベントを解析します。

    • false (デフォルト):RDS のロックレス変更 DDL イベントを解析しません。

    これは実験的な機能です。オンラインでのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。

    説明

    Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。

    scan.incremental.snapshot.backfill.skip

    スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: スナップショット読み取りフェーズ中にバックフィルをスキップします。

    • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

    バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。

    重要

    バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの不整合につながる可能性があります。少なくとも 1 回のセマンティクスのみが保証されます。

    説明

    Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。

    いいえ

    BOOELEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布します。

    • false (デフォルト):スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布しません。

    これは実験的な機能です。有効にすると、スナップショットフェーズ中に最後のチャンクを同期する際の TaskManager の OOM エラーのリスクを軽減できます。ジョブの最初の起動前に追加することを推奨します。

    説明

    Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。

  • ディメンションテーブル固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL のフォーマットは次のとおりです:jdbc:mysql://<endpoint>:<port>/<database_name>

    lookup.max-retries

    データ読み取りに失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    Flink コンピュートエンジン VVR 6.0.7 以降のバージョンでのみサポートされています。

    lookup.cache.strategy

    キャッシュポリシー。

    いいえ

    STRING

    なし

    None、LRU、ALL の 3 つのキャッシュポリシーをサポートします。値の詳細については、「背景情報」をご参照ください。

    説明

    最近最も使用されていない (LRU) キャッシュポリシーを使用する場合、lookup.cache.max-rows パラメーターも構成する必要があります。

    lookup.cache.max-rows

    キャッシュされる最大行数。

    いいえ

    INTEGER

    100000

    • LRU キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要があります。

    • ALL キャッシュポリシーを選択した場合、キャッシュサイズを設定する必要はありません。

    lookup.cache.ttl

    キャッシュの生存時間 (TTL)。

    いいえ

    DURATION

    10 s

    lookup.cache.ttl の設定は lookup.cache.strategy に依存します:

    • lookup.cache.strategyNone に設定されている場合、lookup.cache.ttl を設定する必要はありません。これはキャッシュが期限切れにならないことを意味します。

    • lookup.cache.strategyLRU に設定されている場合、lookup.cache.ttl はキャッシュの有効期限です。デフォルトでは期限切れになりません。

    • lookup.cache.strategyALL に設定されている場合、lookup.cache.ttl はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。

    1min や 10s などの時間形式を使用します。

    lookup.max-join-rows

    プライマリテーブルのレコードがディメンションテーブルのレコードと一致した場合に返される結果の最大数。

    いいえ

    INTEGER

    1024

    なし。

    lookup.filter-push-down.enabled

    ディメンションテーブルのフィルタープッシュダウンを有効にするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:ディメンションテーブルのフィルタープッシュダウンを有効にします。MySQL データベーステーブルからデータをロードする際、ディメンションテーブルは SQL ジョブで設定された条件に基づいてデータを事前にフィルタリングします。

    • false (デフォルト):ディメンションテーブルのフィルタープッシュダウンを無効にします。ディメンションテーブルは MySQL データベーステーブルからロードする際にすべてのデータをロードします。

    説明

    Flink コンピュートエンジン VVR 8.0.7 以降のバージョンでのみサポートされています。

    重要

    ディメンションテーブルのプッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効にすべきです。MySQL ソーステーブルはフィルタープッシュダウンの有効化をサポートしていません。Flink テーブルがソーステーブルとディメンションテーブルの両方として使用され、ディメンションテーブルでフィルタープッシュダウンが有効になっている場合、ソーステーブルとして使用する際には SQL ヒントを使用してこの構成項目を明示的に false に設定する必要があります。そうしないと、ジョブが異常に実行される可能性があります。

  • 結果テーブル固有

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL のフォーマットは次のとおりです:jdbc:mysql://<endpoint>:<port>/<database_name>

    sink.max-retries

    データ書き込みに失敗した後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    sink.buffer-flush.batch-size

    1 回のバッチで書き込むレコード数。

    いいえ

    INTEGER

    4096

    なし。

    sink.buffer-flush.max-rows

    メモリにキャッシュするデータレコードの数。

    いいえ

    INTEGER

    10000

    このパラメーターは、プライマリキーが指定された後にのみ有効になります。

    sink.buffer-flush.interval

    キャッシュをフラッシュする時間間隔。指定された待機時間後にキャッシュされたデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。

    いいえ

    DURATION

    1s

    なし。

    sink.ignore-delete

    データの削除操作を無視するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    Flink SQL によって生成されたストリームに削除または更新前レコードが含まれている場合、複数の出力タスクが同じテーブルの異なるフィールドを同時に更新すると、データの不整合が発生する可能性があります。

    たとえば、レコードが削除された後、別のタスクが一部のフィールドのみを更新します。更新されなかったフィールドは null またはデフォルト値になり、データエラーが発生します。

    sink.ignore-delete を true に設定することで、上流の DELETE および UPDATE_BEFORE 操作を無視して、このような問題を回避できます。

    説明
    • UPDATE_BEFORE は Flink のリトラクションメカニズムの一部であり、更新操作で古い値を「リトラクト」するために使用されます。

    • ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE タイプのレコードはスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。

    sink.ignore-null-when-update

    データを更新する際、入力データフィールドの値が null の場合に、対応するフィールドを null に更新するか、そのフィールドの更新をスキップするかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:フィールドを更新しません。このパラメーターは、Flink テーブルにプライマリキーが設定されている場合にのみ true に設定できます。true に設定した場合:

      • VVR 8.0.6 以前の場合、結果テーブルへのデータ書き込みはバッチ実行をサポートしません。

      • VVR 8.0.7 以降の場合、結果テーブルへのデータ書き込みはバッチ実行をサポートします。

        バッチ書き込みは書き込み効率と全体のスループットを大幅に向上させることができますが、データ遅延や OOM エラーのリスクを引き起こす可能性があります。したがって、実際のビジネスシナリオに基づいてトレードオフを行ってください。

    • false: フィールドを null に更新します。

    説明

    このパラメーターは、リアルタイムコンピューティングエンジン VVR 8.0.5 以降のバージョンでのみサポートされています。

データ型マッピング

  • CDC ソーステーブル

    MySQL CDC フィールドタイプ

    Flink フィールドタイプ

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    重要

    MySQL の TINYINT(1) 型を 0 と 1 以外の値を格納するために使用しないことを推奨します。property-version が 0 に設定されている場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink の BOOLEAN 型にマッピングするため、データの不正確さを引き起こす可能性があります。TINYINT(1) を 0 と 1 以外の値を格納するために使用するには、構成パラメーター catalog.table.treat-tinyint1-as-boolean をご参照ください。

  • ディメンションテーブルと結果テーブル

    MySQL フィールドタイプ

    Flink フィールドタイプ

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    説明

    ここで p <= 38 です。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink は、2,147,483,647 (2^31 - 1) バイト以下の MySQL BLOB 型レコードのみをサポートします。

    BLOB

    MEDIUMBLOB

    LONGBLOB

データインジェスト

データインジェスト YAML ジョブで MySQL コネクタをデータソースとして使用できます。

構文

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

パラメーター

パラメーター

説明

必須

データの型

デフォルト値

備考

type

データソースのタイプ。

はい

STRING

なし

値は mysql でなければなりません。

name

データソース名。

いいえ

STRING

なし

なし。

hostname

MySQL データベースの IP アドレスまたはホスト名。

はい

STRING

なし

VPC アドレスを入力することを推奨します。

説明

MySQL データベースと Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ストレージ管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。

username

MySQL データベースサービスのユーザー名。

はい

STRING

なし

なし。

password

MySQL データベースサービスのパスワード。

はい

STRING

なし

なし。

tables

同期する MySQL データテーブル。

はい

STRING

なし

  • テーブル名は正規表現をサポートし、複数のテーブルからデータを読み取ることができます。

  • 複数の正規表現はカンマで区切ることができます。

説明
  • 正規表現では、文字列の開始 (^) と終了 ($) のアンカーを使用しないでください。バージョン 11.2 では、データベースの正規表現はピリオドで分割して取得されます。開始と終了のアンカーがあると、結果のデータベース正規表現が使用できなくなります。たとえば、^db.user_[0-9]+$db.user_[0-9]+ に変更する必要があります。

  • ピリオドはデータベース名とテーブル名を区切るために使用されます。ピリオドを任意の文字に一致させるために使用するには、バックスラッシュでエスケープする必要があります。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

tables.exclude

同期から除外するテーブル。

いいえ

STRING

なし

  • テーブル名は正規表現をサポートし、複数のテーブルからデータを除外できます。

  • 複数の正規表現はカンマで区切ることができます。

説明

ピリオドはデータベース名とテーブル名を区切るために使用されます。ピリオドを任意の文字に一致させるために使用するには、バックスラッシュでエスケープする必要があります。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

port

MySQL データベースサービスのポート番号。

いいえ

INTEGER

3306

なし。

schema-change.enabled

スキーマ変更イベントを送信するかどうかを指定します。

いいえ

BOOLEAN

true

なし。

server-id

同期に使用されるデータベースクライアントの数値 ID または範囲。

いいえ

STRING

5400 から 6400 の間のランダムな値が生成されます。

この ID は MySQL クラスター内でグローバルに一意でなければなりません。同じデータベースに接続する各ジョブに異なる ID を設定することを推奨します。

このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、複数の同時リーダーがサポートされます。この場合、各同時リーダーが異なる ID を使用するように ID 範囲を設定することを推奨します。

jdbc.properties.*

JDBC URL のカスタム接続パラメーター。

いいえ

STRING

なし

カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と構成できます。

サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。

debezium.*

Binlog を読み取るための Debezium のカスタムパラメーター。

いいえ

STRING

なし

カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

scan.incremental.snapshot.chunk.size

各チャンクのサイズ (行数)。

いいえ

INTEGER

8096

MySQL テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータは、完全に読み取られるまでメモリにキャッシュされます。

チャンクあたりの行数が少ないと、テーブル内のチャンクの総数が多くなります。これにより、障害回復の粒度は向上しますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて合理的なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

テーブルの完全なデータを読み取る際に一度にフェッチする最大レコード数。

いいえ

INTEGER

1024

なし。

scan.startup.mode

データ消費の起動モード。

いいえ

STRING

initial

有効な値:

  • initial (デフォルト):最初の起動時に、完全な既存データをスキャンし、その後最新の Binlog データを読み取ります。

  • latest-offset:最初の起動時に、既存データをスキャンせず、Binlog の末尾 (最新の位置) から読み取りを開始します。コネクタの起動後に行われた変更のみを読み取ります。

  • earliest-offset:既存データをスキャンせず、利用可能な最も古い Binlog から読み取りを開始します。

  • specific-offset:既存データをスキャンせず、指定された Binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方を構成してオフセットを指定するか、scan.startup.specific-offset.gtid-set のみを構成して GTID セットを指定できます。

  • timestamp:既存データをスキャンせず、指定されたタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。

重要

earliest-offsetspecific-offset、および timestamp 起動モードの場合、起動時のテーブルスキーマが指定された開始オフセット時のスキーマと異なる場合、ジョブは失敗します。つまり、これら 3 つのモードを使用する場合、指定された Binlog 消費位置とジョブの起動時間の間でテーブルスキーマが変更されないことを確認する必要があります。

scan.startup.specific-offset.file

特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。

いいえ

STRING

なし

この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の例:mysql-bin.000003

scan.startup.specific-offset.pos

特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。

いいえ

INTEGER

なし

この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.gtid-set

特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。

いいえ

STRING

なし

この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。

いいえ

LONG

なし

この構成を使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒単位です。

重要

特定の時間を使用する場合、MySQL CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリーンアップされておらず、読み取り可能であることを確認してください。

server-time-zone

データベースが使用するセッションタイムゾーン。

いいえ

STRING

このパラメーターを指定しない場合、システムは Flink ジョブランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

例:Asia/Shanghai。このパラメーターは、MySQL の TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。

scan.startup.specific-offset.skip-events

指定されたオフセットから読み取る際にスキップする Binlog イベントの数。

いいえ

INTEGER

なし

この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.skip-rows

指定されたオフセットから読み取る際にスキップする行の変更数。単一の Binlog イベントは複数の行の変更に対応する場合があります。

いいえ

INTEGER

なし

この構成を使用する場合、scan.startup.modespecific-offset に設定する必要があります。

connect.timeout

MySQL データベースサーバーへの接続がタイムアウトするまで待機する最大時間 (再試行前)。

いいえ

DURATION

30s

なし。

connect.max-retries

MySQL データベースサービスへの接続に失敗した後の最大再試行回数。

いいえ

INTEGER

3

なし。

connection.pool.size

データベース接続プールのサイズ。

いいえ

INTEGER

20

データベース接続プールは接続を再利用するために使用され、データベース接続の数を減らすことができます。

heartbeat.interval

ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。

いいえ

DURATION

30s

ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、MySQL の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。期限切れの Binlog オフセットはジョブの失敗を引き起こし、ステートレスな再起動が必要になります。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。

いいえ。

STRING

なし

プライマリキーから 1 つの列のみを選択できます。

rds.region-id

Alibaba Cloud RDS for MySQL インスタンスのリージョン ID。

OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

STRING

なし

リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

rds.access-key-id

Alibaba Cloud RDS for MySQL アカウントの AccessKey ID。

OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

STRING

なし

詳細については、「AccessKey 情報を表示する方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey ID を指定することを推奨します。詳細については、「変数管理」をご参照ください。

rds.access-key-secret

Alibaba Cloud RDS for MySQL アカウントの AccessKey Secret。

OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

STRING

なし

詳細については、「AccessKey 情報を表示する方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、シークレットを管理して AccessKey Secret を指定することを推奨します。詳細については、「変数管理」をご参照ください。

rds.db-instance-id

Alibaba Cloud RDS for MySQL インスタンスのインスタンス ID。

OSS からアーカイブされたログを読み取る機能を使用する場合に必須です。

STRING

なし

なし。

rds.main-db-id

Alibaba Cloud RDS for MySQL インスタンスのプライマリデータベース ID。

いいえ

STRING

なし

プライマリデータベース ID の取得方法の詳細については、「RDS for MySQL のログバックアップ」をご参照ください。

rds.download.timeout

OSS から単一のアーカイブ済みログをダウンロードするためのタイムアウト期間。

いいえ

DURATION

60s

なし。

rds.endpoint

OSS Binlog 情報を取得するためのエンドポイント。

いいえ

STRING

なし

有効な値の詳細については、「エンドポイント」をご参照ください。

rds.binlog-directory-prefix

Binlog ファイルを格納するディレクトリのプレフィックス。

いいえ

STRING

rds-binlog-

なし。

rds.use-intranet-link

Binlog ファイルのダウンロードに内部ネットワークリンクを使用するかどうかを指定します。

いいえ

BOOLEAN

true

なし。

rds.binlog-directories-parent-path

Binlog ファイルが格納されている親ディレクトリの絶対パス。

いいえ

STRING

なし

なし。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

INTEGER

1000

メタデータがこの値より大きい場合、送信のために複数の部分に分割されます。

chunk-key.even-distribution.factor.lower-bound

均等シャーディングのためのチャンク分布係数の下限。

いいえ

DOUBLE

0.05

分布係数がこの値より小さい場合、不均等シャーディングが使用されます。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。

chunk-key.even-distribution.factor.upper-bound

均等シャーディングのためのチャンク分布係数の上限。

いいえ

DOUBLE

1000.0

分布係数がこの値より大きい場合、不均等シャーディングが使用されます。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが終了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

いいえ

BOOLEAN

  • VVR 8.x バージョンではデフォルト値は false です。

  • VVR 11.1 以降のバージョンではデフォルト値は true です。

有効な値:

  • true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

  • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

scan.parallel-deserialize-changelog.enabled

増分フェーズで、複数のスレッドを使用して変更イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:Binlog イベントの順序を維持しながら、マルチスレッドを使用して変更イベントを逆シリアル化し、読み取りを高速化します。

  • false (デフォルト):単一のスレッドを使用してイベントを逆シリアル化します。

説明

Flink コンピュートエンジン VVR 8.0.11 以降のバージョンでのみサポートされています。

scan.parallel-deserialize-changelog.handler.size

複数のスレッドを使用して変更イベントを解析する場合のイベントハンドラの数。

いいえ

INTEGER

2

説明

Flink コンピュートエンジン VVR 8.0.11 以降のバージョンでのみサポートされています。

メタデータ-カラム.インクルード-リスト

下流のシンクに渡すメタデータ列。

いいえ

文字列

なし

利用可能なメタデータには、table_namedatabase_nameop_tses_ts、および query_log があります。複数のメタデータ列はカンマで区切ることができます。

説明

MySQL CDC YAML コネクタは、op_type メタデータ列の追加を要求またはサポートしていません。Transform 式で __data_event_type__ を直接使用して、変更データタイプを取得できます。

重要

es_ts メタデータ列は、トランザクションの開始時刻を表します。MySQL バージョン 8.0.x を使用している場合にのみ追加できます。以前のバージョンの MySQL を使用している場合は、このメタデータ列を追加しないでください。

scan.newly-added-table.enabled

チェックポイントから再起動する際に、前回の起動時に一致しなかった新しく追加されたテーブルを同期するか、または現在一致しないテーブルを状態から削除するかを指定します。

いいえ

BOOLEAN

false

チェックポイントまたはセーブポイントから再起動する際に有効になります。

scan.binlog.newly-added-table.enabled

増分フェーズで、一致する新しく追加されたテーブルからデータを送信するかどうかを指定します。

いいえ

BOOLEAN

false

scan.newly-added-table.enabled と同時に有効にすることはできません。

scan.incremental.snapshot.chunk.key-column

特定のテーブルに対して、スナップショットフェーズでのシャーディングの分割キーとして使用する列を指定します。

いいえ

STRING

なし

  • テーブル名とフィールド名をコロン (:) で接続してルールを定義します。テーブル名は正規表現にすることができます。複数のルールをセミコロン (;) で区切って定義できます。例:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • これはプライマリキーのないテーブルでは必須であり、選択された列は非ヌル型 (NOT NULL) でなければなりません。プライマリキーのあるテーブルではオプションであり、プライマリキーから 1 つの列のみを選択できます。

scan.parse.online.schema.changes.enabled

増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true: RDS のロックレス変更 DDL イベントを解析します。

  • false (デフォルト): RDS のロックレス変更 DDL イベントを解析しません。

これは実験的な機能です。オンラインでのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。

説明

Flink コンピュートエンジン VVR 11.0 以降のバージョンでのみサポートされています。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの不整合につながる可能性があります。少なくとも 1 回のセマンティクスのみが保証されます。

説明

Flink コンピュートエンジン VVR 11.1 以降のバージョンでのみサポートされています。

treat-tinyint1-as-boolean.enabled

TINYINT(1) 型をブール型として扱うかどうかを指定します。

いいえ

ブール値

true

有効な値:

  • true (デフォルト):TINYINT(1) 型をブール型として扱います。

  • false:TINYINT(1) 型をブール型として扱いません。

treat-timestamp-as-datetime-enabled

TIMESTAMP 型を DATETIME 型として扱うかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:MySQL の TIMESTAMP 型を DATETIME 型として扱い、CDC の TIMESTAMP 型にマッピングします。

  • false (デフォルト):MySQL の TIMESTAMP 型を CDC の TIMESTAMP_LTZ 型にマッピングします。

MySQL の TIMESTAMP 型は UTC 時刻を格納し、タイムゾーンの影響を受けます。MySQL の DATETIME 型はリテラル時刻を格納し、タイムゾーンの影響を受けません。

有効にすると、server-time-zone に基づいて MySQL の TIMESTAMP 型データを DATETIME 型に変換します。

include-comments.enabled

テーブルとフィールドのコメントを同期するかどうかを指定します。

いいえ

BOOELEAN

false

有効な値:

  • true:テーブルとフィールドのコメントを同期します。

  • false (デフォルト):テーブルとフィールドのコメントを同期しません。

これを有効にすると、ジョブのメモリ使用量が増加します。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。

いいえ

BOOELEAN

false

このパラメーターは次の値をサポートします:

  • true: スナップショット読み取りフェーズ中に無制限のシャードの分散を優先します。

  • false (デフォルト): スナップショット読み取りフェーズ中に無制限のシャードの分散を優先しません。

これは実験的な機能です。この機能を有効にすると、TaskManager がスナップショットフェーズ中に最後のシャードを同期する際のメモリ不足 (OOM) エラーのリスクが軽減されます。ジョブが初めて開始される前に、このパラメーターを追加してください。

説明

Flink コンピュートエンジン Ververica Runtime (VVR) 11.1 以降でのみサポートされています。

型マッピング

次の表は、データインジェストのデータ型マッピングを示しています。

MySQL CDC フィールドタイプ

CDC フィールドタイプ

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] ただし p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

treat-timestamp-as-datetime-enabled パラメーターの値に基づいて、フィールドは次のようにマッピングされます:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65

STRING

説明

MySQL では、decimal データ型の精度は最大 65 ですが、Flink では、decimal データ型の精度は 38 に制限されています。したがって、精度が 38 を超える decimal 列を定義する場合は、精度の損失を避けるために文字列にマッピングする必要があります。

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] ただし 38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

説明

JSON データ型は Flink で JSON 形式の文字列に変換されます。

GEOMETRY

STRING

説明

MySQL の空間データ型は、固定の JSON 形式の文字列に変換されます。詳細については、MySQL の空間データ型マッピングをご参照ください。

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

説明

MySQL の BLOB データ型については、長さが 2,147,483,647 (2**31-1) 以下の BLOB のみがサポートされています。

BLOB

MEDIUMBLOB

LONGBLOB

使用例

  • CDC ソーステーブル

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • シンクテーブル

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • データインジェストのデータソース

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

MySQL CDC ソーステーブルについて

  • 実装原理

    MySQL CDC ソーステーブルが起動すると、テーブル全体をスキャンし、プライマリキーに基づいて複数のチャンクに分割し、現在のバイナリログオフセットを記録します。次に、増分スナップショットアルゴリズムを使用して、SELECT 文で各チャンクからデータを読み取ります。ジョブは定期的にチェックポイントを実行して、完了したチャンクを記録します。フェールオーバーが発生した場合、ジョブは未完了のチャンクのみを読み取る必要があります。すべてのチャンクが読み取られた後、ジョブは以前に記録されたバイナリログオフセットから増分変更レコードの読み取りを開始します。Flink ジョブは定期的なチェックポイントを実行し続け、バイナリログオフセットを記録します。ジョブがフェールオーバーした場合、最後に記録されたバイナリログオフセットから処理を再開します。このプロセスにより、Exactly-Once セマンティクスが実現されます。

    増分スナップショットアルゴリズムの詳細な説明については、「MySQL CDC コネクタ」をご参照ください。

  • メタデータ

    メタデータは、シャーディングされたデータベースとテーブルをマージして同期するシナリオで役立ちます。これは、マージ後、ビジネスでは各データのソースデータベースとテーブルを区別したいことが多いためです。メタデータ列を使用して、ソーステーブルのデータベースとテーブル名情報にアクセスできます。したがって、メタデータ列を使用して、複数のシャーディングされたテーブルを単一の宛先テーブルに簡単にマージできます。

    MySQL CDC ソースはメタデータ列の構文をサポートしています。メタデータ列を使用して、次のメタデータにアクセスできます。

    メタデータキー

    メタデータ型

    説明

    database_name

    STRING NOT NULL

    行を含むデータベースの名前。

    table_name

    STRING NOT NULL

    行を含むテーブルの名前。

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    データベースで変更が行われた時刻。レコードがテーブルの既存データからのものであり、Binlog からのものでない場合、この値は常に 0 です。

    op_type

    STRING NOT NULL

    行の変更タイプ。

    • +I:INSERT メッセージ

    • -D:DELETE メッセージ

    • -U:UPDATE_BEFORE メッセージ

    • +U:UPDATE_AFTER メッセージ

    説明

    Realtime Compute for Apache Flink VVR 8.0.7 以降のバージョンでのみサポートされています。

    query_log

    STRING NOT NULL

    読み取られた行に対応する MySQL クエリログレコード。

    説明

    MySQL は、クエリログを記録するために binlog_rows_query_log_events パラメーターを有効にする必要があります。

    次のコード例は、MySQL インスタンス内の複数のシャーディングされたデータベースから複数の orders テーブルを、下流の Hologres インスタンスの holo_orders テーブルにマージして同期する方法を示しています。

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- データベース名を読み取ります。
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- テーブル名を読み取ります。
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更タイムスタンプを読み取ります。
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取ります。
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- 複数のシャーディングされたデータベースに一致する正規表現。
      'table-name' = 'orders_.*'   -- 複数のシャーディングされたテーブルに一致する正規表現。
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    上記のコードに基づき、WITH 句で scan.read-changelog-as-append-only.enabled パラメーターが true に設定されている場合、出力結果は下流テーブルのプライマリキー設定によって異なります:

    • 下流テーブルのプライマリキーが order_id の場合、出力結果には上流テーブルの各プライマリキーの最後の変更のみが含まれます。たとえば、最後の変更が削除操作であったプライマリキーの場合、下流テーブルには同じプライマリキーと op_type が -D のレコードが表示されます。

    • 下流テーブルのプライマリキーが order_id、operation_ts、および op_type の複合である場合、出力結果には上流テーブルの各プライマリキーの完全な変更履歴が含まれます。

  • 正規表現のサポート

    MySQL CDC ソーステーブルは、テーブル名またはデータベース名に正規表現を使用して、複数のテーブルまたはデータベースに一致させることをサポートしています。次のコード例は、正規表現を使用して複数のテーブルを指定する方法を示しています。

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 複数のデータベースに一致する正規表現。
      'table-name' = '(t[5-8]|tt)' -- 複数のテーブルに一致する正規表現。
    );

    次の表は、上記の例の正規表現を説明しています:

    • ^(test).* はプレフィックスマッチングの例です。この式は、「test1」や「test2」など、「test」で始まるデータベース名に一致します。

    • .*[p$] はサフィックスマッチングの例です。この式は、「cdcp」や「edcp」など、「p」で終わるデータベース名に一致します。

    • txc は特定のマッチです。これは、正確に「txc」であるデータベース名に一致します。

    完全修飾テーブル名を照合する場合、MySQL CDC はデータベース名とテーブル名を使用してテーブルを一意に識別します。パターンは database-name.table-name です。たとえば、照合パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) は、txc.tt や test2.test5 などのデータベース内のテーブルに一致します。

    重要

    SQL ジョブの構成では、table-name と database-name はカンマ (,) を使用して複数のテーブルまたはデータベースを指定することをサポートしていません。

    • 複数のテーブルを照合したり、複数の正規表現を使用したりするには、それらを縦棒 (|) で接続し、括弧で囲むことができます。たとえば、テーブル「user」と「product」を読み取るには、table-name を (user|product) として構成できます。

    • 正規表現にカンマが含まれている場合は、縦棒 (|) 演算子を使用して書き直す必要があります。たとえば、正規表現 mytable_\d{1, 2} は、カンマを使用しないように、同等の (mytable_\d{1}|mytable_\d{2}) に書き直す必要があります。

  • 同時実行制御

    MySQL コネクタは、複数の同時実行スレッドで完全なデータを読み取ることをサポートしており、データ読み込み効率を向上させることができます。Realtime Compute for Apache Flink コンソールの Autopilot 自動チューニング機能と組み合わせることで、コネクタはマルチスレッド読み取りが完了した後、増分フェーズ中に自動的にスケールインして計算リソースを節約できます。

    Realtime Compute for Apache Flink 開発コンソールでは、リソース構成ページで基本モードまたはエキスパートモードでジョブの同時実行数を設定できます。違いは次のとおりです:

    • 基本モードで設定された同時実行数は、ジョブ全体のグローバルな同時実行数です。基础模式

    • エキスパートモードは、必要に応じて特定の VERTEX の同時実行数を設定することをサポートしています。vertex并发

    リソース構成の詳細については、「ジョブのデプロイメント情報の構成」をご参照ください。

    重要

    基本モードまたはエキスパートモードのいずれを使用する場合でも、同時実行数を設定する際には、テーブルで宣言されている server-id の範囲がジョブの同時実行数以上である必要があります。たとえば、server-id の範囲が 5404-5412 の場合、8 つの一意の server-id があります。この場合、ジョブは最大 8 つの同時実行スレッドを持つことができます。同じ MySQL インスタンスに対する異なるジョブは、server-id の範囲が重複してはなりません。つまり、各ジョブには明示的に異なる server-id を構成する必要があります。

  • Autopilot 自動スケールイン

    完全データフェーズでは、大量の既存データが蓄積されます。読み取り効率を向上させるために、既存データは通常、同時に読み取られます。増分バイナリログフェーズでは、バイナリログデータの量が少なく、グローバルな順序を保証する必要があるため、通常は単一の同時読み取りで十分です。完全フェーズと増分フェーズの異なるリソース要件は、自動チューニング機能によって自動的にバランスが取られ、高性能とリソース効率の両方を実現できます。

    自動チューニングは、MySQL CDC ソースの各タスクのトラフィックを監視します。ジョブがバイナリログフェーズに入ると、1 つのタスクのみがバイナリログの読み取りを担当し、他のタスクがアイドル状態の場合、自動チューニングは自動的にソースの CU 数と同時実行数を削減します。自動チューニングを有効にするには、ジョブの O&M ページで自動チューニングモードをアクティブに設定します。

    説明

    並列処理の次数を減らすためのデフォルトの最小トリガー間隔は 24 時間です。自動チューニングのパラメーターと詳細については、「自動チューニングの構成」をご参照ください。

  • 起動モード

    scan.startup.mode 構成オプションを使用して、MySQL CDC ソーステーブルの起動モードを指定できます。利用可能なオプションは次のとおりです:

    • initial (デフォルト):最初の起動時に、コネクタはデータベーステーブルの完全な読み取りを実行し、その後、増分モードに切り替えてバイナリログを読み取ります。

    • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古いバイナリログオフセットから読み取りを開始します。

    • latest-offset:スナップショットフェーズをスキップし、バイナリログの末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの開始後に発生したデータ変更のみを読み取ることができます。

    • specific-offset:スナップショットフェーズをスキップし、指定されたバイナリログオフセットから読み取りを開始します。オフセットは、バイナリログのファイル名と位置、または GTID セットで指定できます。

    • timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプからバイナリログイベントの読み取りを開始します。

    使用例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始します。
        'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始します。
        'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始します。
        'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始します。
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードで Binlog ファイル名を指定します。
        'scan.startup.specific-offset.pos' = '4', -- specific-offset モードで Binlog 位置を指定します。
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードで GTID セットを指定します。
        'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードで起動タイムスタンプを指定します。
        ...
    )
    重要
    • MySQL ソースは、チェックポイント中に現在のオフセットを INFO レベルでログに出力します。ログのプレフィックスは Binlog offset on checkpoint {checkpoint-id} です。このログは、特定のチェックポイントオフセットからジョブを開始するのに役立ちます。

    • 読み取られるテーブルがスキーマ変更を経た場合、最も古いオフセット (earliest-offset)、特定のオフセット (specific-offset)、またはタイムスタンプ (timestamp) から開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のテーブルスキーマを保存しており、スキーマが一致しない古いデータは正しく解析できないためです。

  • プライマリキーのない CDC ソーステーブルについて

    • プライマリキーのないテーブルを使用する場合、scan.incremental.snapshot.chunk.key-column を設定する必要があります。非ヌルフィールドのみを選択できます。

    • プライマリキーのない CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定された列の動作によって決まります:

      • 指定された列が更新されない場合、Exactly-Once セマンティクスが保証されます。

      • 指定された列が更新される場合、At-Least-Once セマンティクスのみが保証されます。ただし、プライマリキーを持ち、べき等操作を使用する下流のシンクと列を組み合わせることで、データの正確性を保証できます。

  • Alibaba Cloud RDS for MySQL バックアップログの読み取り

    MySQL CDC ソーステーブルは、Alibaba Cloud RDS for MySQL からのバックアップログの読み取りをサポートしています。これは、完全データフェーズに時間がかかり、ローカルのバイナリログファイルが自動的にクリーンアップされたが、自動または手動でアップロードされたバックアップファイルがまだ存在する場合に役立ちます。

    使用例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • CDC ソースの再利用を有効にする

    同じインスタンスから複数の MySQL CDC ソーステーブルを使用するジョブは、複数の Binlog クライアントを起動し、データベースの負荷を増加させます。詳細については、「MySQL CDC よくある質問」をご参照ください。

    ソリューション

    Realtime Compute for Apache Flink VVR 8.0.7 以降のバージョンは、MySQL CDC ソースの再利用をサポートしています。再利用は、マージ可能な MySQL CDC ソーステーブルをマージします。マージは、ソーステーブルがデータベース名、テーブル名、および server-id を除く同じ構成項目を持つ場合に発生します。エンジンは、同じジョブ内の MySQL CDC ソースを自動的にマージします。

    手順

    1. SQL ジョブで SET コマンドを使用します:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (VVR 8.0.8 および 8.0.9 の場合) さらにこの項目を設定します:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 以降のバージョンでは、再利用はデフォルトで有効になっています。
    2. ジョブをステートレスで開始します。ソースの再利用構成を変更すると、ジョブのトポロジーが変更されます。ジョブをステートレスで開始する必要があります。そうしないと、ジョブの開始に失敗したり、データが失われたりする可能性があります。ソースがマージされた場合、MergetableSourceScan ノードが表示されます。

    重要
    • 再利用を有効にした後、演算子チェーンを無効にすることは推奨されません。pipeline.operator-chainingfalse に設定すると、データのシリアル化と逆シリアル化のオーバーヘッドが増加します。マージされるソースが多いほど、オーバーヘッドは大きくなります。

    • VVR 8.0.7 では、演算子チェーンを無効にするとシリアル化の問題が発生します。

Binlog 読み取りの高速化

MySQL コネクタをソーステーブルまたはデータインジェストのデータソースとして使用する場合、増分フェーズ中にバイナリログファイルを解析してさまざまな変更メッセージを生成します。バイナリログファイルは、すべてのテーブルの変更をバイナリ形式で記録します。次の方法でバイナリログファイルの解析を高速化できます。

  • 解析フィルター構成の有効化

    • 構成項目 scan.only.deserialize.captured.tables.changelog.enabled を使用します:指定されたテーブルの変更イベントのみを解析します。

  • Debezium パラメーターの最適化

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size: ブロッキングキューが保持できるレコードの最大数です。Debezium がデータベースからイベントストリームを読み取ると、イベントをダウンストリームに書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。

    • debezium.max.batch.size: 各反復でコネクタが処理するイベントの最大数です。デフォルト値は 2048 です。

    • debezium.poll.interval.ms: コネクタが新しい変更イベントをリクエストする前に待機するミリ秒数です。デフォルト値は 1000 ミリ秒、つまり 1 秒です。

使用例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium 構成
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 解析フィルターを有効にする
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 指定されたテーブルの変更イベントのみを解析します。
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium 構成
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # 解析フィルターを有効にする
  scan.only.deserialize.captured.tables.changelog.enabled: true

MySQL CDC Enterprise Edition のバイナリログ消費能力は 85 MB/s で、オープンソースコミュニティバージョンの約 2 倍です。バイナリログファイルの生成速度が 85 MB/s を超える場合 (つまり、512 MB のファイルが 6 秒ごとに 1 つ生成される場合)、Flink ジョブの遅延は上昇し続けます。バイナリログファイルの生成速度が遅くなると、処理遅延は徐々に減少します。バイナリログファイルに大きなトランザクションが含まれている場合、処理遅延が一時的に増加する可能性があります。トランザクションログが読み取られた後、遅延は減少します。

MySQL CDC DataStream API

重要

DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。

DataStream API プログラムを作成し、MySqlSource を使用できます。以下にコードと pom 依存関係の例を示します:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // キャプチャするデータベースを設定
        .tableList("yourDatabaseName.yourTableName") // キャプチャするテーブルを設定
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // SourceRecord を JSON 文字列に変換
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // チェックポイントを有効にする
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // 4 つの並列ソースタスクを設定
      .setParallelism(4)
      .print().setParallelism(1); // メッセージの順序を保つためにシンクに並列度 1 を使用
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

MySqlSource をビルドする際には、コードで次のパラメーターを指定する必要があります:

パラメーター

説明

hostname

MySQL データベースの IP アドレスまたはホスト名。

port

MySQL データベースサービスのポート番号。

databaseList

MySQL データベース名。

説明

データベース名は正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。.* を使用してすべてのデータベースに一致させることができます。

username

MySQL データベースサービスのユーザー名。

password

MySQL データベースサービスのパスワード。

deserializer

SourceRecord 型のレコードを指定された型に逆シリアル化するデシリアライザ。有効な値:

  • RowDataDebeziumDeserializeSchema:SourceRecord を Flink Table または SQL の内部データ構造 RowData に変換します。

  • JsonDebeziumDeserializationSchema:SourceRecord を JSON 形式の文字列に変換します。

pom の依存関係では、次のパラメーターを指定する必要があります:

${vvr.version}

Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョン。例:1.17-vvr-8.0.4-3

説明

定期的に Hotfix バージョンをリリースする可能性があるため、Maven に表示されているバージョン番号を使用してください。これらの更新は他のチャネルを通じて発表されない場合があります。

${flink.version}

Apache Flink のバージョン。例:1.17.2

重要

ジョブの実行時に互換性の問題が発生しないように、Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョンに対応する Apache Flink のバージョンを使用してください。バージョンの対応については、「DPI エンジン」をご参照ください。

よくある質問

CDC ソーステーブルを使用する際に発生する可能性のある問題については、「CDC の問題」をご参照ください。