すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MySQL

最終更新日:Feb 13, 2026

このトピックでは、MySQL コネクタの使用方法について説明します。

背景情報

MySQL コネクタは、RDS MySQL、PolarDB for MySQL、OceanBase(MySQL モード)、および自己管理型 MySQL など、MySQL プロトコルと互換性のあるすべてのデータベースをサポートしています。

重要

MySQL コネクタを使用して OceanBase から読み取る場合、OceanBase のバイナリログ(binlog)が有効化され、正しく構成されていることを確認してください。詳細については、「バイナリログ操作」をご参照ください。本機能はパブリックプレビュー版です。十分な評価を行い、慎重にご利用ください。

以下の表に、MySQL コネクタの対応状況を示します。

カテゴリ

詳細

対応タイプ

ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストソース

実行モード

ストリーミングモードのみ

データ形式

該当なし

特定の監視メトリック

監視メトリック

  • ソーステーブル

    • currentFetchEventTimeLag:データが生成されてから、ソースオペレーターによって取得されるまでの時間間隔

      このメトリックは、binlog フェーズでのみ有効です。スナップショットフェーズ中は、常に値が 0 になります。

    • currentEmitEventTimeLag:データが生成されてから、ソースオペレーターを離れるまでの時間間隔

      このメトリックは、binlog フェーズでのみ有効です。スナップショットフェーズ中は、常に値が 0 になります。

    • sourceIdleTime:ソーステーブルが新しいデータを生成していない期間

  • ディメンションテーブルおよび結果テーブル:該当なし

説明

これらのメトリックの詳細については、「監視メトリック」をご参照ください。

API タイプ

DataStream、SQL、およびデータインジェスト YAML

結果テーブルのデータ更新または削除の対応

はい

特徴

MySQL Change Data Capture(CDC)ソーステーブルは、まずデータベースの完全な履歴データを読み取り、その後シームレスにバイナリログ(binlog)イベントの読み取りに切り替えます。このプロセスにより、障害発生時でもデータの欠落や重複が発生せず、1 回限りのセマンティクスが保証されます。MySQL CDC ソーステーブルは、完全データの同時読み取りをサポートし、増分スナップショットアルゴリズムを用いてロックフリーの読み取りと再開可能なデータ転送を実現します。詳細については、「MySQL CDC ソーステーブルについて」をご参照ください。

  • バッチ処理とストリーミング処理の統合:別々のパイプラインを維持する必要なく、完全データおよび増分データの両方を読み取ることができます。

  • 完全データの同時読み取り:水平方向にスケールアウトしてパフォーマンスを向上させます。

  • 完全読み取りから増分読み取りへのシームレスな切り替え:自動的にスケールインしてコンピューティングリソースを節約します。

  • 再開可能なデータ転送:完全データ読み取り中の再開可能データ転送をサポートし、安定性を向上させます。

  • ロックフリー読み取り:オンラインビジネス運用に影響を与えることなく完全データを読み取れます。

  • バックアップログの読み取り:RDS MySQL のバックアップログの読み取りをサポートします。

  • 並列 binlog 解析:binlog ファイルを並列で解析することで、読み取り遅延を低減します。

前提条件

MySQL CDC ソーステーブルを使用する前に、MySQL の設定に記載されている通り、MySQL データベースを設定する必要があります。以下の設定が必要です。

RDS MySQL

  • 接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。

  • 対応している MySQL バージョン:5.6、5.7、および 8.0.x

  • バイナリログ(binlog)を有効化します。デフォルトで有効化されています。

  • binlog 形式を ROW に設定します。デフォルトの形式です。

  • binlog_row_image を FULL に設定します。デフォルト設定です。

  • バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。

  • SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を持つ MySQL ユーザーが作成されました。

  • MySQL データベースおよびテーブルを作成します。詳細については、「RDS MySQL のデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。

  • IP ホワイトリストを設定します。詳細については、「RDS MySQL のホワイトリストの設定」をご参照ください。

PolarDB for MySQL

  • 接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。

  • 対応している MySQL バージョン:5.6、5.7、および 8.0.x

  • バイナリログ(binlog)を有効化します。デフォルトでは無効化されています。

  • binlog 形式を ROW に設定します。デフォルトの形式です。

  • binlog_row_image を FULL に設定します。デフォルト設定です。

  • バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。

  • MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与しました。

  • MySQL データベースおよびテーブルを作成します。詳細については、「PolarDB for MySQL のデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。

  • IP ホワイトリストを設定します。詳細については、「PolarDB for MySQL のホワイトリストの設定」をご参照ください。

自己管理型 MySQL

  • 接続テストを Realtime Compute for Apache Flink と実行し、ネットワーク接続を確認します。

  • 対応している MySQL バージョン:5.6、5.7、および 8.0.x

  • バイナリログ(binlog)を有効化します。デフォルトでは無効化されています。

  • binlog 形式を ROW に設定します。デフォルトの形式は STATEMENT です。

  • binlog_row_image を FULL に設定します。デフォルト設定です。

  • バイナリログトランザクション圧縮を無効化します。この機能は MySQL 8.0.20 で導入され、デフォルトで無効化されています。

  • MySQL ユーザーを作成し、SELECT、SHOW DATABASES、REPLICATION SLAVE、および REPLICATION CLIENT 権限を付与済みである必要があります。

  • MySQL データベースおよびテーブルを作成します。詳細については、「自己管理型 MySQL インスタンスのデータベースおよびアカウントの作成」をご参照ください。権限不足による運用失敗を回避するため、MySQL データベースの作成には特権アカウントの使用を推奨します。

  • IP ホワイトリストを設定します。詳細については、「自己管理型 MySQL インスタンスのホワイトリストの設定」をご参照ください。

制限事項

一般的な制限事項

  • MySQL CDC ソーステーブルは、ウォーターマークの定義をサポートしていません。

  • CREATE TABLE AS SELECT(CTAS)および CREATE DATABASE AS SELECT(CDAS)ジョブにおいて、MySQL CDC ソーステーブルは一部のスキーマ変更を同期できます。対応する変更タイプの詳細については、「スキーマ進化同期ポリシー」をご参照ください。

  • MySQL CDC コネクタは、バイナリログトランザクション圧縮をサポートしていません。したがって、MySQL CDC コネクタを使用して増分データを消費する場合、この機能が無効化されていることを確認する必要があります。そうでない場合、増分データの取得に失敗する可能性があります。

RDS MySQL の制限事項

  • RDS MySQL のセカンダリデータベースまたは読み取り専用レプリカからのデータ読み取りは推奨しません。これらのインスタンスの binlog 保持期間は通常短く、binlog が期限切れになってクリアされると、ジョブが binlog データを消費できずエラーが発生する可能性があります。

  • RDS MySQL では、プライマリデータベースとセカンダリデータベース間の並列同期がデフォルトで有効化されていますが、トランザクション順序の一貫性は保証されません。これにより、プライマリ/セカンダリのスイッチオーバーおよびチェックポイント回復時に一部のデータが欠落する可能性があります。この問題を解決するには、RDS MySQL で手動で slave_preserve_commit_order オプションを有効化できます。

PolarDB for MySQL の制限事項

MySQL CDC ソーステーブルは、PolarDB for MySQL バージョン 1.0.19 以前のマルチマスタクラスターからの読み取りをサポートしていません。詳細については、「マルチマスタクラスターとは?」をご参照ください。これらのクラスターによって生成された binlog には、重複するテーブル ID が含まれている可能性があり、CDC ソーステーブルでスキーママッピングエラーが発生し、binlog 解析エラーにつながる可能性があります。

オープンソース MySQL の制限事項

MySQL では、プライマリ/レプリカのバイナリログレプリケーション中にデフォルトでトランザクション順序が維持されます。ただし、MySQL レプリカで並列レプリケーション(slave_parallel_workers > 1)が有効化され、かつ slave_preserve_commit_order=ON が設定されていない場合、トランザクションのコミット順序がプライマリデータベースと異なる可能性があります。Flink CDC がチェックポイントから回復する際に、この順序の不整合によりデータが欠落する可能性があります。そのため、MySQL レプリカで slave_preserve_commit_order = ON を設定するか、slave_parallel_workers = 1 に設定することを推奨します。なお、slave_parallel_workers を 1 に設定すると、レプリケーションパフォーマンスが低下する可能性があります。

注意事項

  • 各 MySQL CDC データソースに対して、異なる Server ID を明示的に設定します

    Server ID の目的

    各 MySQL CDC データソースに対して、異なる Server ID を明示的に設定する必要があります。複数の MySQL CDC データソースが同じ Server ID を共有し、接続を再利用できない場合、binlog オフセットが乱れ、過剰読み取りまたは不足読み取りが発生します。

    さまざまなシナリオにおける Server ID の設定

    DDL 文で Server ID を指定できますが、DDL オプションではなく動的ヒントで設定することを推奨します。

    • 並列処理の次数 = 1 または増分スナップショットが無効化されている場合

      ## 増分スナップショットフレームワークが有効化されていない場合、または並列処理の次数が 1 の場合に、特定の Server ID を指定します。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • 並列処理の次数 > 1 かつ増分スナップショットが有効化されている場合

      ## Server ID の範囲を指定します。範囲内の利用可能な Server ID の数が、並列処理の次数以上になるようにしてください。並列処理の次数が 3 であると仮定します。
      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • CTAS を使用したデータ同期の場合

      CTAS を使用してデータ同期を行う場合、CDC データソースの構成が同一であれば、ソースが自動的に再利用されます。この場合、複数の CDC データソースに同じ Server ID を設定できます。詳細については、「例 4:複数の CTAS ステートメント」をご参照ください。

    • 再利用できない複数の非 CTAS ソーステーブルの場合

      ジョブに複数の MySQL CDC ソーステーブルが含まれており、同期に CTAS ステートメントを使用しない場合、データソースは再利用できません。この場合、各 CDC ソーステーブルに異なる Server ID を割り当てる必要があります。同様に、増分スナップショットフレームワークが有効化されており、並列処理の次数が 1 より大きい場合、Server ID の範囲を指定する必要があります。

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;
  • シンクテーブル

    • DDL ステートメントで自動インクリメントのプライマリキーを宣言しないでください。MySQL は、データ書き込み時にこのフィールドを自動的に埋めます。

    • 少なくとも 1 つの非プライマリキーのフィールドを宣言する必要があります。そうでない場合、エラーが発生します。

    • DDL ステートメント内の NOT ENFORCED は、Flink がプライマリキーの妥当性チェックを実施しないことを意味します。プライマリキーの正確性および整合性は、ユーザーが保証する必要があります。詳細については、「妥当性チェック」をご参照ください。

  • ディメンションテーブル

    クエリの高速化のためにインデックスを使用する場合、JOIN 句のフィールド順序は、インデックス定義の順序と一致させる必要があります。これは「左端プレフィックスルール」と呼ばれます。たとえば、インデックスが (a, b, c) 上にある場合、JOIN 条件は ON t.a = x AND t.b = y である必要があります。

    Flink によって生成された SQL は、オプティマイザーによって再書き換えられる可能性があり、実際のデータベースクエリでインデックスが使用されないことがあります。インデックスが実際に使用されているかどうかを確認するには、MySQL の実行計画(EXPLAIN)またはスロークエリログを確認し、実行された実際の SELECT ステートメントを確認してください。

SQL

MySQL コネクタは、SQL ジョブでソーステーブル、ディメンションテーブル、または結果テーブルとして使用できます。

構文

CREATE TEMPORARY TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

説明
  • コネクタが結果テーブルに書き込む方法:受信した各レコードに対して、コネクタは単一の SQL ステートメントを構築して実行します。正確なステートメントは、テーブル構造に応じて異なります:

    • プライマリキーのない結果テーブルの場合、システムは次の SQL ステートメントを構築して実行します:INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);

    • プライマリキーを持つ結果テーブルの場合、システムは次の SQL ステートメントを実行します:INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...; 注:物理テーブルにプライマリキーに加えて一意なインデックス制約がある場合、異なるプライマリキーだが一意なインデックスでカバーされるカラムの値が同一である 2 つのレコードを挿入すると、一意なインデックスの競合が発生します。この競合によりデータが上書きされ、出力データでデータ損失が発生します。

  • MySQL データベースで自動インクリメントのプライマリキーを定義している場合、Flink DDL ステートメントで自動インクリメントフィールドを宣言しないでください。データ挿入時にデータベースがこのフィールドを自動的に埋めます。コネクタは、自動インクリメントフィールドを含むデータの書き込みおよび削除をサポートしますが、更新はサポートしません。

WITH パラメーター

  • 一般

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    はい

    STRING

    なし

    ソーステーブルとして使用する場合、このオプションを mysql-cdc または mysql に設定します。これらは同等です。ディメンションテーブルまたは結果テーブルとして使用する場合、このオプションを mysql に設定します。

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    はい

    STRING

    なし

    VPC アドレスの入力を推奨します。

    説明

    MySQL データベースと Realtime Compute for Apache Flink が同じ VPC 内にない場合、クロス VPC ネットワーク接続を確立するか、インターネット経由でアクセスしてください。詳細については、「ワークスペースの管理および運用」および「フルマネージド Flink クラスターがインターネットにアクセスする方法」をご参照ください。

    username

    MySQL データベースサービスのユーザー名。

    はい

    STRING

    なし

    なし。

    password

    MySQL データベースサービスのパスワード。

    はい

    STRING

    なし

    なし。

    database-name

    MySQL データベースの名前。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、このオプションは複数のデータベースからデータを読み取るために正規表現をサポートします。

    • 正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ を使用しないでください。「table-name」の備考欄をご覧ください。

    table-name

    MySQL テーブルの名前。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、このオプションは複数のテーブルからデータを読み取るために正規表現をサポートします。

      複数の MySQL テーブルからデータを読み取る場合、複数の CTAS ステートメントを 1 つのジョブとして送信します。これにより、複数の binlog リスナーを有効化することを避け、パフォーマンスおよび効率を向上させます。詳細については、「複数の CTAS ステートメント:1 つのジョブとして送信」をご参照ください。

    • 正規表現を使用する場合、文字列の先頭および末尾をマッチさせるために ^ および $ を使用しないでください。詳細については、以下の備考をご覧ください。

    説明

    MySQL CDC ソーステーブルがテーブル名をマッチさせる場合、指定した database-name および table-name を文字列 \\.(VVR バージョン 8.0.1 より前のバージョンでは文字 .)を使用して完全パスの正規表現に結合します。その後、この正規表現を使用して、MySQL データベース内のテーブルの完全修飾名をマッチさせ、読み取るテーブルを決定します。

    たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' を設定した場合、コネクタは正規表現 db_.*\\.tb_.+(または 8.0.1 より前のバージョンでは db_.*.tb_.+)を使用して完全修飾テーブル名をマッチさせ、読み取るテーブルを決定します。

    port

    MySQL データベースサービスのポート番号。

    いいえ

    INTEGER

    3306

    なし。

  • ソース固有

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    server-id

    データベースクライアントの数値 ID。

    いいえ

    STRING

    5400 ~ 6400 の間のランダム値が生成されます。

    この ID は、MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブに対して、異なる ID を割り当てることを推奨します。

    このオプションは、ID の範囲(例:5400-5408)もサポートします。増分読み取りが有効化されている場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用できるよう、ID の範囲を指定することを推奨します。詳細については、「server ID の使用」をご参照ください。

    scan.incremental.snapshot.enabled

    増分スナップショットを有効化するかどうかを指定します。

    いいえ

    BOOLEAN

    true

    増分スナップショットはデフォルトで有効化されています。増分スナップショットは、完全データスナップショットを読み取るための新しいメカニズムです。従来のスナップショット方式と比較して、以下のような利点があります:

    • 完全データの読み取りを並列で実行できます。

    • 完全データの読み取りをチャンク単位でチェックポイントできます。

    • 完全データの読み取りにグローバル読み取りロック(FLUSH TABLES WITH READ LOCK)を取得する必要はありません。

    ソースが同時読み取りをサポートするようにする場合、各同時リーダーには一意の server ID が必要です。したがって、server-id は 5400-6400 のような範囲である必要があり、その範囲は並列処理の次数以上である必要があります。

    説明

    この設定項目は、Ververica Runtime(VVR)11.1 以降で削除されました。

    scan.incremental.snapshot.chunk.size

    各チャンクのサイズ(行数)。

    いいえ

    INTEGER

    8096

    増分スナップショット読み取りが有効化されている場合、テーブルは読み取り用に複数のチャンクに分割されます。チャンクデータは、完全に読み取られるまでメモリ内にバッファーされます。

    各チャンクに含まれる行数が少ないほど、テーブル内のチャンクの総数が多くなります。これは、障害回復の粒度を小さくしますが、Out Of Memory(OOM)エラーが発生したり、全体的なスループットが低下したりする可能性があります。したがって、適切なチャンクサイズを設定する必要があります。

    scan.snapshot.fetch.size

    完全テーブルデータを読み取る際に、1 回に取得する最大レコード数。

    いいえ

    INTEGER

    1024

    なし。

    scan.startup.mode

    データ消費の起動モード。

    いいえ

    STRING

    initial

    有効な値:

    • initial(デフォルト):最初の起動時に、まず完全な履歴データをスキャンし、その後最新の binlog データを読み取ります。

    • latest-offset:最初の起動時に履歴データをスキャンしません。binlog の末尾から読み取りを開始し、コネクタの起動後に発生した最新の変更のみを読み取ります。

    • earliest-offset:履歴データをスキャンしません。利用可能な最も古い binlog から読み取りを開始します。

    • specific-offset:履歴データをスキャンしません。指定した特定の binlog オフセットから読み取りを開始します。オフセットは、scan.startup.specific-offset.file および scan.startup.specific-offset.pos の両方を設定するか、scan.startup.specific-offset.gtid-set のみを設定して、特定の GTID セットから開始できます。

    • timestamp:履歴データをスキャンしません。指定したタイムスタンプから binlog イベントの読み取りを開始します。scan.startup.timestamp-millis を使用して、ミリ秒単位でタイムスタンプを指定します。

    重要

    earliest-offsetspecific-offset、または timestamp を使用する場合、指定した binlog 消費位置とジョブ起動時刻の間にテーブルスキーマが変更されていないことを確認してください。そうしないと、スキーマの不一致によるエラーが発生する可能性があります。

    scan.startup.specific-offset.file

    特定のオフセット起動モードで使用する開始オフセットの binlog ファイル名。

    いいえ

    STRING

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定します。ファイル名の例:mysql-bin.000003

    scan.startup.specific-offset.pos

    特定のオフセット起動モードで使用する指定された binlog ファイル内の開始オフセット。

    いいえ

    INTEGER

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定します。

    scan.startup.specific-offset.gtid-set

    特定のオフセット起動モードで使用する開始オフセットの GTID セット。

    いいえ

    STRING

    なし

    この設定を使用する場合、scan.startup.modespecific-offset に設定します。GTID セットの例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    タイムスタンプ起動モードで使用する開始オフセットのミリ秒単位のタイムスタンプ。

    いいえ

    LONG

    なし

    この設定を使用する場合、scan.startup.modetimestamp に設定します。タイムスタンプはミリ秒単位です。

    重要

    タイムスタンプを使用する場合、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、対応する binlog ファイルを検索しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

    server-time-zone

    データベースで使用されるセッションタイムゾーン。

    いいえ

    STRING

    このオプションを指定しない場合、システムは Flink ジョブの実行環境のタイムゾーンをデータベースサーバータイムゾーンとして使用します(選択したゾーンのタイムゾーン)。

    例:Asia/Shanghai。このオプションは、MySQL TIMESTAMP 型が STRING 型に変換される方法を制御します。詳細については、「Debezium 時間型値」をご参照ください。

    debezium.min.row.count.to.stream.results

    テーブルの行数がこの値を超える場合、バッチ読み取りモードを使用します。

    いいえ

    INTEGER

    1000

    Flink は、MySQL ソーステーブルからデータを次のように読み取ります:

    • 完全読み取り:テーブルの全データを直接メモリに読み込みます。これは高速ですが、データ量に比例してメモリを消費します。ソーステーブルが非常に大きい場合、OOM 問題が発生する可能性があります。

    • バッチ読み取り:固定数の行を 1 バッチとして読み取り、すべてのデータが読み取られるまで繰り返します。これは、大規模なテーブルに対する OOM リスクを回避しますが、速度は遅くなります。

    connect.timeout

    MySQL データベースサーバーへの接続がタイムアウトするまでの最大待機時間(再試行する前)。

    いいえ

    DURATION

    30s

    なし。

    connect.max-retries

    MySQL データベースサービスへの接続失敗後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    connection.pool.size

    データベース接続プールのサイズ。

    いいえ

    INTEGER

    20

    データベース接続プールは、データベース接続数を削減するために接続を再利用します。

    jdbc.properties.*

    JDBC URL のカスタム接続オプション。

    いいえ

    STRING

    なし

    カスタム接続オプションを渡すことができます。たとえば、SSL を無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。

    サポートされている接続オプションの詳細については、「MySQL 設定プロパティ」をご参照ください。

    debezium.*

    binlog を読み取るためのカスタム Debezium オプション。

    いいえ

    STRING

    なし

    カスタム Debezium オプションを渡すことができます。たとえば、解析エラーの処理方法を指定するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。

    heartbeat.interval

    ソースがハートビートイベントを使用して binlog オフセットを進める間隔。

    いいえ

    DURATION

    30s

    ハートビートイベントは、ソースで binlog オフセットを進めるのに役立ちます。これは、MySQL で更新が遅いテーブルに特に有用です。このようなテーブルでは、binlog オフセットが自動的に進まないため、ハートビートイベントによって binlog オフセットを前方に進めることで、期限切れになった binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になるという問題を防ぎます。

    scan.incremental.snapshot.chunk.key-column

    スナップショットフェーズ中にチャンクを分割するために使用するカラム。

    備考を参照してください。

    STRING

    なし

    • プライマリキーのないテーブルに必須です。選択したカラムは NULL 不可(NOT NULL)である必要があります。

    • プライマリキーのあるテーブルでは任意です。プライマリキーから 1 つのカラムのみを選択できます。

    rds.region-id

    Alibaba Cloud RDS MySQL インスタンスのリージョン ID。

    OSS からアーカイブログを読み取る場合に必須です。

    STRING

    なし

    リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

    rds.access-key-id

    Alibaba Cloud RDS MySQL アカウントの AccessKey ID。

    OSS からアーカイブログを読み取る場合に必須です。

    STRING

    なし

    詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、AccessKey ID はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。

    rds.access-key-secret

    Alibaba Cloud RDS MySQL アカウントの AccessKey Secret。

    OSS からアーカイブログを読み取る場合に必須です。

    STRING

    なし

    詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。

    重要

    AccessKey 情報の漏洩を防ぐため、AccessKey Secret はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。

    rds.db-instance-id

    Alibaba Cloud RDS MySQL インスタンスのインスタンス ID。

    OSS からアーカイブログを読み取る場合に必須です。

    STRING

    なし

    なし。

    rds.main-db-id

    Alibaba Cloud RDS MySQL インスタンスのプライマリデータベース ID。

    いいえ

    STRING

    なし

    • プライマリデータベース ID の取得方法については、「RDS MySQL ログバックアップ」をご参照ください。

    • VVR 8.0.7 以降でのみサポートされます。

    rds.download.timeout

    OSS から単一のアーカイブログをダウンロードする際のタイムアウト。

    いいえ

    DURATION

    60s

    なし。

    rds.endpoint

    OSS binlog 情報を取得するためのサービスエンドポイント。

    いいえ

    STRING

    なし

    scan.incremental.close-idle-reader.enabled

    スナップショットフェーズ終了後にアイドルリーダーを閉じるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    • VVR 8.0.1 以降でのみサポートされます。

    • この設定は、execution.checkpointing.checkpoints-after-tasks-finish.enabled が true に設定されている場合にのみ有効です。

    scan.read-changelog-as-append-only.enabled

    チャネルログストリームを追加専用ストリームに変換するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのメッセージタイプを INSERT メッセージに変換します。上流テーブルの削除メッセージを保持するなどの特殊なケースでのみ有効化してください。

    • false(デフォルト):すべてのメッセージタイプがそのまま通過します。

    説明

    VVR 8.0.8 以降でのみサポートされます。

    scan.only.deserialize.captured.tables.changelog.enabled

    増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

    いいえ

    BOOLEAN

    • VVR 8.x ではデフォルト値は false です。

    • VVR 11.1 以降ではデフォルト値は true です。

    有効な値:

    • true:ターゲットテーブルの変更データのみを逆シリアル化して、binlog 読み取りを高速化します。

    • false(デフォルト):すべてのテーブルの変更データを逆シリアル化します。

    説明
    • VVR 8.0.7 以降でのみサポートされます。

    • VVR 8.0.8 以前では、このパラメーターを debezium.scan.only.deserialize.captured.tables.changelog.enable に名前を変更します。

    scan.parse.online.schema.changes.enabled

    増分フェーズ中に、RDS ロックレス DDL イベントの解析を試行するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:RDS ロックレス DDL イベントを解析します。

    • false(デフォルト):RDS ロックレス DDL イベントを解析しません。

    これは実験的な機能です。オンラインロックレス変更を実行する前に、Flink ジョブの回復用にスナップショットを取得してください。

    説明

    VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.backfill.skip

    スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

    • false(デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

    バックフィルがスキップされた場合、スナップショットフェーズ中のテーブルの変更は、後続の増分フェーズで読み取られ、スナップショットにマージされません。

    重要

    バックフィルをスキップすると、スナップショットフェーズ中の変更が再再生される可能性があるため、データの不整合が発生する可能性があります。保証されるのは、最低限 1 回のセマンティクスのみです。

    説明

    VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布するかどうかを指定します。

    いいえ

    BOOELEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布します。

    • false(デフォルト):スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布しません。

    これは実験的な機能です。これを有効化すると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクが低減します。ジョブの初回起動前にこれを追加することを推奨します。

    説明

    VVR 11.1 以降でのみサポートされます。

    binlog.session.network.timeout

    binlog 接続の読み取り/書き込み操作のネットワークタイムアウト。

    いいえ

    DURATION

    10m

    これを 0s に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。

    説明

    VVR 11.5 以降でのみサポートされます。

    scan.rate-limit.records-per-second

    ソースによって 1 秒あたりに出力される最大レコード数を制限します。

    いいえ

    LONG

    なし

    データ読み取りを制限するのに役立ちます。この制限は、完全フェーズおよび増分フェーズの両方に適用されます。

    numRecordsOutPerSecond メトリックは、データフロー全体で 1 秒あたりに出力されるレコード数を反映します。このメトリックに基づいて、このパラメーターを調整してください。

    完全読み取り中は、scan.incremental.snapshot.chunk.size の値を下げることで、1 バッチあたりの行数を減らしてください。

    説明

    VVR 11.5 以降でのみサポートされます。

  • ディメンションテーブル固有

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL 形式は jdbc:mysql://<endpoint>:<port>/<database_name> です。

    lookup.max-retries

    データ読み取り失敗後の最大再試行回数。

    いいえ

    INTEGER

    3

    VVR 6.0.7 以降でのみサポートされます。

    lookup.cache.strategy

    キャッシュポリシー。

    いいえ

    STRING

    なし

    3つのキャッシュポリシーをサポートしています:なし、LRU、およびすべて。詳細については、「背景情報」をご参照ください。

    説明

    LRU キャッシュポリシーを使用する場合、lookup.cache.max-rows オプションも設定する必要があります。

    lookup.cache.max-rows

    キャッシュされる最大行数。

    いいえ

    INTEGER

    100000

    • LRU キャッシュポリシーを選択する場合、キャッシュサイズを指定する必要があります。

    • ALL キャッシュポリシーを選択する場合、任意です。

    lookup.cache.ttl

    キャッシュの生存時間(TTL)。

    いいえ

    DURATION

    10 s

    lookup.cache.ttl の設定は、lookup.cache.strategy に依存します:

    • lookup.cache.strategyNone に設定されている場合、lookup.cache.ttl は任意であり、キャッシュは期限切れになりません。

    • lookup.cache.strategyLRU に設定されている場合、lookup.cache.ttl はキャッシュの TTL です。デフォルトでは期限切れになりません。

    • lookup.cache.strategyALL に設定されている場合、lookup.cache.ttl はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。

    1min や 10s などの形式で時間を指定してください。

    lookup.max-join-rows

    プライマリテーブルの各行に対して、ディメンションテーブルからクエリする際に返される結果の最大数。

    いいえ

    INTEGER

    1024

    なし。

    lookup.filter-push-down.enabled

    ディメンションテーブルのフィルターのプッシュダウンを有効化するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:ディメンションテーブルのフィルターのプッシュダウンを有効化します。MySQL データベーステーブルからデータを読み込む際、ディメンションテーブルは SQL ジョブで設定された条件に基づいて事前にデータをフィルターします。

    • false(デフォルト):ディメンションテーブルのフィルターのプッシュダウンを無効化します。MySQL データベーステーブルからデータを読み込む際、ディメンションテーブルはすべてのデータを読み込みます。

    説明

    VVR 8.0.7 以降でのみサポートされます。

    重要

    フィルターのプッシュダウンは、Flink テーブルがディメンションテーブルとして使用される場合にのみ有効化すべきです。MySQL ソーステーブルは、フィルターのプッシュダウンを有効化できません。Flink テーブルがソーステーブルおよびディメンションテーブルの両方として使用される場合、ディメンションテーブルでフィルターのプッシュダウンを有効化するときは、SQL ヒント を使用して、ソーステーブルでこのオプションを明示的に false に設定してください。そうしないと、ジョブが異常終了する可能性があります。

  • シンク固有の

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    url

    MySQL JDBC URL。

    いいえ

    STRING

    なし

    URL 形式は jdbc:mysql://<endpoint>:<port>/<database_name> です。

    sink.max-retries

    データ書き込み失敗後の最大再試行回数。

    いいえ

    INTEGER

    3

    なし。

    sink.buffer-flush.batch-size

    単一のバッチで書き込まれるレコード数。

    いいえ

    INTEGER

    4096

    なし。

    sink.buffer-flush.max-rows

    メモリ内にバッファーされるデータレコード数。

    いいえ

    INTEGER

    10000

    このオプションは、プライマリキーが指定されている場合にのみ有効です。

    sink.buffer-flush.interval

    バッファーをフラッシュする時間間隔。指定された時間待機した後も出力条件を満たさない場合、システムは自動的にすべてのバッファーデータを出力します。

    いいえ

    DURATION

    1s

    なし。

    sink.ignore-delete

    DELETE 操作を無視するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    Flink SQL によって生成されるストリームに delete または update-before レコードが含まれている場合、複数の出力タスクによる同一テーブルの異なるフィールドへの同時更新により、データの不整合が発生する可能性があります。

    たとえば、レコードが削除された後に、別のタスクが一部のフィールドのみを更新すると、更新されていないフィールドが null またはデフォルト値になるため、データエラーが発生します。

    このような問題を回避するには、sink.ignore-delete を true に設定して、上流の DELETE および UPDATE_BEFORE 操作を無視してください。

    説明
    • UPDATE_BEFORE は Flink のリトラクション機構の一部であり、更新操作で「古い値」を「取り消す」ために使用されます。

    • ignoreDelete = true の場合、すべての DELETE および UPDATE_BEFORE レコードがスキップされ、INSERT および UPDATE_AFTER レコードのみが処理されます。

    sink.ignore-null-when-update

    データを更新する際に、入力フィールド値が null の場合に、対応するフィールドを null に設定するか、更新をスキップするかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:フィールドの更新をスキップします。Flink テーブルにプライマリキーがある場合にのみサポートされます。true に設定した場合:

      • VVR 8.0.6 以前では、結果テーブルへのデータ書き込みのバッチ実行はサポートされません。

      • VVR 8.0.7 以降では、結果テーブルへのデータ書き込みのバッチ実行がサポートされます。

        バッチ書き込みは書き込み効率および全体的なスループットを向上させますが、データ遅延および OOM リスクを引き起こす可能性があります。ビジネスシナリオに応じて、これらのトレードオフを調整してください。

    • false:フィールドを null に設定します。

    説明

    VVR 8.0.5 以降でのみサポートされます。

型マッピング

  • CDC ソーステーブル

    MySQL CDC フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    重要

    TINYINT(1) 型を MySQL で 0 および 1 以外の値を格納するために使用しないことを推奨します。property-version=0 の場合、MySQL CDC ソーステーブルはデフォルトで TINYINT(1) を Flink の BOOLEAN 型にマッピングします。これにより、データの不正確さが発生する可能性があります。TINYINT(1) を 0 および 1 以外の値を格納するために使用する場合は、「catalog.table.treat-tinyint1-as-boolean」の設定オプションをご参照ください。

  • 次元テーブルと結果テーブル

    MySQL フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    説明

    p は ≤ 38 である必要があります。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink は、最大サイズが 2,147,483,647(2^31 - 1)の MySQL BLOB レコードをサポートしています。

    BLOB

    MEDIUMBLOB

    LONGBLOB

データインジェスト

MySQL コネクタは、データインジェスト YAML ジョブのデータソースとして使用できます。

構文

source:
   type: mysql
   name: MySQL ソース
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

設定項目

パラメーター

説明

必須

データ型

デフォルト値

備考

type

データソースのタイプ。

はい

STRING

なし

このオプションを mysql に設定します。

name

データソースの名前。

いいえ

STRING

なし

なし。

hostname

MySQL データベースの IP アドレスまたはホスト名。

はい

STRING

なし

Virtual Private Cloud(VPC)アドレスの入力を推奨します。

説明

MySQL データベースと Realtime Compute for Apache Flink が同じ VPC 内にない場合、クロス VPC ネットワーク接続を確立するか、インターネット経由でアクセスしてください。詳細については、「ワークスペースの管理および運用」および「フルマネージド Flink クラスターがインターネットにアクセスする方法」をご参照ください。

username

MySQL データベースサービスのユーザー名。

はい

STRING

なし

なし。

password

MySQL データベースサービスのパスワード。

はい

STRING

なし

なし。

tables

同期する MySQL データテーブル。

はい

STRING

なし

  • テーブル名は、複数のテーブルからデータを読み取るために正規表現をサポートします。

  • 複数の正規表現をカンマで区切ります。

説明
  • 正規表現で開始および終了文字 ^ および $ を使用しないでください。バージョン 11.2 では、データベースの正規表現はピリオドで分割されます。開始および終了文字を使用すると、結果のデータベース正規表現が無効になります。たとえば、^db.user_[0-9]+$db.user_[0-9]+ に変更します。

  • ピリオドはデータベース名とテーブル名を区切ります。ピリオドで任意の文字をマッチさせるには、バックスラッシュでエスケープします。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

tables.exclude

同期から除外するテーブル。

いいえ

STRING

なし

  • テーブル名は、複数のテーブルからデータを除外するために正規表現をサポートします。

  • 複数の正規表現をカンマで区切ります。

説明

ピリオドはデータベース名とテーブル名を区切ります。ピリオドで任意の文字をマッチさせるには、バックスラッシュでエスケープします。例:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

port

MySQL データベースサービスのポート番号。

いいえ

INTEGER

3306

なし。

schema-change.enabled

スキーマ変更イベントを送信するかどうかを指定します。

いいえ

BOOLEAN

true

なし。

server-id

データベースクライアントが同期に使用する数値 ID または範囲。

いいえ

STRING

5400 ~ 6400 の間のランダム値が生成されます。

この ID は、MySQL クラスター内でグローバルに一意である必要があります。同じデータベースに接続する各ジョブに対して、異なる ID を割り当てることを推奨します。

このオプションは、5400-5408 のような ID の範囲もサポートします。増分読み取りが有効化されている場合、同時読み取りがサポートされます。この場合、各同時リーダーが異なる ID を使用できるよう、ID の範囲を指定することを推奨します。

jdbc.properties.*

JDBC URL のカスタム接続パラメーター。

いいえ

STRING

なし

カスタム接続パラメーターを渡すことができます。たとえば、SSL を無効化するには、'jdbc.properties.useSSL' = 'false' を設定します。

サポートされている接続パラメーターの詳細については、「MySQL 設定プロパティ」をご参照ください。

debezium.*

バイナリログを読み取るためのカスタム Debezium パラメーター。

いいえ

STRING

なし

カスタム Debezium パラメーターを渡すことができます。たとえば、解析エラーの処理方法を指定するには、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用します。

scan.incremental.snapshot.chunk.size

各チャンクのサイズ(行数)。

いいえ

INTEGER

8096

MySQL テーブルは、読み取り用に複数のチャンクに分割されます。チャンクデータは、完全に読み取られるまでメモリ内にバッファーされます。

各チャンクに含まれる行数が少ないほど、テーブル内のチャンクの総数が多くなります。これは障害回復の粒度を小さくしますが、OOM 問題を引き起こしたり、全体的なスループットを低下させたりする可能性があります。したがって、これらの要因をバランスよく考慮し、適切なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

完全テーブルデータを読み取る際に、1 回に取得する最大レコード数。

いいえ

INTEGER

1024

なし。

scan.startup.mode

データ消費の起動モード。

いいえ

STRING

initial

有効な値:

  • initial(デフォルト):最初の起動時に完全な履歴データをスキャンし、その後最新の binlog データを読み取ります。

  • latest-offset:スナップショットフェーズをスキップし、binlog の末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの起動後に発生した変更のみを読み取ります。

  • earliest-offset: 既存データをスキャンしません。利用可能な最も古い binlog から読み取りを開始します。

  • specific-offset:スナップショットフェーズをスキップし、指定した特定の binlog オフセットから読み取りを開始します。オフセットは、scan.startup.specific-offset.file および scan.startup.specific-offset.pos の両方を設定するか、scan.startup.specific-offset.gtid-set のみを設定して、特定の GTID セットから開始できます。

  • timestamp:スナップショットフェーズをスキップし、指定したタイムスタンプから binlog イベントの読み取りを開始します。タイムスタンプは、scan.startup.timestamp-millis を使用してミリ秒単位で指定します。

重要

earliest-offsetspecific-offset、および timestamp の場合、起動時刻と指定された開始オフセット時刻の間にテーブルスキーマが変更されていると、スキーマの不一致によるジョブ失敗が発生します。言い換えると、これらの 3 つの起動モードを使用する場合、指定された binlog 消費位置とジョブ起動時刻の間にテーブルスキーマが変更されないことを確認する必要があります。

scan.startup.specific-offset.file

特定のオフセット起動モードで使用する開始オフセットの binlog ファイル名。

いいえ

STRING

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定します。ファイル名の例:mysql-bin.000003

scan.startup.specific-offset.pos

特定のオフセット起動モードで使用する指定された binlog ファイル内の開始オフセット。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定します。

scan.startup.specific-offset.gtid-set

特定のオフセット起動モードで使用する開始オフセットの GTID セット。

いいえ

STRING

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定します。GTID セットの例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

タイムスタンプ起動モードで使用する開始オフセットのミリ秒単位のタイムスタンプ。

いいえ

LONG

なし

この設定を使用する場合、scan.startup.modetimestamp に設定します。タイムスタンプはミリ秒単位です。

重要

タイムスタンプを使用する場合、MySQL CDC は各 binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、対応する binlog ファイルを検索しようとします。指定されたタイムスタンプの binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

server-time-zone

データベースで使用されるセッションタイムゾーン。

いいえ

STRING

このオプションを指定しない場合、システムは Flink ジョブの実行環境のタイムゾーンをデータベースサーバータイムゾーンとして使用します(選択したゾーンのタイムゾーン)。

例:Asia/Shanghai。このオプションは、MySQL TIMESTAMP 型が STRING 型に変換される方法を制御します。詳細については、「Debezium 時間型値」をご参照ください。

scan.startup.specific-offset.skip-events

特定のオフセットから読み取る際にスキップする binlog イベント数。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定します。

scan.startup.specific-offset.skip-rows

特定のオフセットから読み取る際にスキップする行変更数。単一の binlog イベントは複数の行変更に対応する場合があります。

いいえ

INTEGER

なし

この設定を使用する場合、scan.startup.modespecific-offset に設定します。

connect.timeout

MySQL データベースサーバーへの接続がタイムアウトするまでの最大待機時間(再試行する前)。

いいえ

DURATION

30s

なし。

connect.max-retries

MySQL データベースサービスへの接続失敗後の最大再試行回数。

いいえ

INTEGER

3

なし。

connection.pool.size

データベース接続プールのサイズ。

いいえ

INTEGER

20

データベース接続プールは、データベース接続数を削減するために接続を再利用します。

heartbeat.interval

ソースがハートビートイベントを使用して binlog オフセットを進める間隔。

いいえ

DURATION

30s

ハートビートイベントは、ソースで binlog オフセットを進めるのに役立ちます。これは、MySQL で更新が遅いテーブルに特に有用です。このようなテーブルでは、binlog オフセットが自動的に進まないため、ハートビートイベントによって binlog オフセットを前方に進めることで、期限切れになった binlog オフセットが原因でジョブが失敗し、ステートレスな再起動が必要になるという問題を防ぎます。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にチャンクを分割するために使用するカラム。

いいえ。

STRING

なし

プライマリキーから 1 つのカラムのみを選択できます。

rds.region-id

Alibaba Cloud RDS MySQL インスタンスのリージョン ID。

OSS からアーカイブログを読み取る場合に必須です。

STRING

なし

リージョン ID の詳細については、「リージョンとゾーン」をご参照ください。

rds.access-key-id

Alibaba Cloud RDS MySQL アカウントの AccessKey ID。

OSS からアーカイブログを読み取る場合に必須です。

STRING

なし

詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、AccessKey ID はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。

rds.access-key-secret

Alibaba Cloud RDS MySQL アカウントの AccessKey Secret。

OSS からアーカイブログを読み取る場合に必須です。

STRING

なし

詳細については、「AccessKey ID および AccessKey Secret の表示方法」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、AccessKey Secret はシークレット管理を使用して管理してください。詳細については、「変数管理」をご参照ください。

rds.db-instance-id

Alibaba Cloud RDS MySQL インスタンスのインスタンス ID。

OSS からアーカイブログを読み取る場合に必須です。

STRING

なし

なし。

rds.main-db-id

Alibaba Cloud RDS MySQL インスタンスのプライマリデータベース ID。

いいえ

STRING

なし

プライマリデータベース ID の取得方法については、「RDS MySQL ログバックアップ」をご参照ください。

rds.download.timeout

OSS から単一のアーカイブログをダウンロードする際のタイムアウト。

いいえ

DURATION

60s

なし。

rds.endpoint

OSS binlog 情報を取得するためのサービスエンドポイント。

いいえ

STRING

なし

有効な値については、「サービスエンドポイント」をご参照ください。

rds.binlog-directory-prefix

binlog ファイルを保存するディレクトリプレフィックス。

いいえ

STRING

rds-binlog-

なし。

rds.use-intranet-link

内部ネットワークを使用して binlog ファイルをダウンロードするかどうかを指定します。

いいえ

BOOLEAN

true

なし。

rds.binlog-directories-parent-path

binlog ファイルを保存する親ディレクトリの絶対パス。

いいえ

STRING

なし

なし。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

INTEGER

1000

メタデータがこのサイズを超える場合、複数の部分に分割して送信されます。

chunk-key.even-distribution.factor.lower-bound

均等シャーディングのためのチャンク分布係数の下限。

いいえ

DOUBLE

0.05

この値より小さいチャンク分布係数は、不均等なシャーディングを引き起こします。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 行数の合計。

chunk-key.even-distribution.factor.upper-bound

均等シャーディングのためのチャンク分布係数の上限。

いいえ

DOUBLE

1000.0

この値より大きいチャンク分布係数は、不均等なシャーディングを引き起こします。

チャンク分布係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 行数の合計。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズ終了後にアイドルリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この設定が有効になるには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定する必要があります。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

いいえ

BOOLEAN

  • VVR 8.x ではデフォルト値は false です。

  • VVR 11.1 以降ではデフォルト値は true です。

有効な値:

  • true:ターゲットテーブルの変更データのみを逆シリアル化して、binlog 読み取りを高速化します。

  • false(デフォルト):すべてのテーブルの変更データを逆シリアル化します。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に、マルチスレッドを使用して変更イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:逆シリアル化中にマルチスレッドを使用して、binlog イベントの順序を維持しながら読み取りを高速化します。

  • false(デフォルト):逆シリアル化中にシングルスレッドを使用します。

説明

VVR 8.0.11 以降でのみサポートされます。

scan.parallel-deserialize-changelog.handler.size

変更イベントの解析にマルチスレッドを使用する場合のイベントハンドラーの数。

いいえ

INTEGER

2

説明

VVR 8.0.11 以降でのみサポートされます。

metadata-column.include-list

下流に渡すメタデータカラム。

いいえ

STRING

なし

利用可能なメタデータには、op_tses_tsquery_logfile、および pos があります。これらはカンマで区切ることができます。

説明

MySQL CDC YAML コネクタは、データベース名、テーブル名、および op_type のメタデータカラムを追加する必要はなく、またサポートしていません。Transform 式で __data_event_type__ を直接使用して変更データタイプを取得したり、__schema_name__ および __table_name__ を使用してデータベース名およびテーブル名を取得したりできます。

重要

file メタデータカラムは、データを含む binlog ファイルを表します。完全フェーズ中は空("")であり、増分フェーズ中は binlog ファイル名が含まれます。pos メタデータカラムは、binlog ファイル内のデータのオフセットを表します。完全フェーズ中は "0" であり、増分フェーズ中は実際のオフセットが含まれます。これらのメタデータカラムは VVR 11.5 以降でサポートされます。

es_ts は、MySQL におけるチャネルログに関連付けられたトランザクションの開始時間を表します。MySQL 8.0.x のみでサポートされます。それより前の MySQL バージョンを使用する場合は、このメタデータカラムを追加しないでください。

scan.newly-added-table.enabled

チェックポイントからの再起動時に、前回の実行でマッチしなかった新しく追加されたテーブルを同期するか、現在マッチしていないテーブルを状態から削除するかを指定します。

いいえ

BOOLEAN

false

これは、チェックポイントまたはセーブポイントからの再起動時に有効になります。

scan.binlog.newly-added-table.enabled

増分フェーズ中に、パターンに一致する新しく追加されたテーブルのデータを送信するかどうかを指定します。

いいえ

BOOLEAN

false

これは scan.newly-added-table.enabled と同時に有効化できません。

scan.incremental.snapshot.chunk.key-column

特定のテーブルに対して、スナップショットフェーズ中のチャンク分割キーとして使用するカラムを指定します。

いいえ

STRING

なし

  • テーブル名とフィールド名をコロン(:)で結合してルールを作成します。テーブル名には正規表現を使用できます。複数のルールはセミコロン(;)で区切ります。例:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • プライマリキーのないテーブルに必須です。選択したカラムは NULL 不可(NOT NULL)である必要があります。プライマリキーのあるテーブルでは任意です。プライマリキーから 1 つのカラムのみを選択できます。

scan.parse.online.schema.changes.enabled

増分フェーズ中に、RDS ロックレス DDL イベントの解析を試行するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:RDS ロックレス DDL イベントを解析します。

  • false(デフォルト):RDS ロックレス DDL イベントを解析しません。

これは実験的な機能です。オンラインロックレス変更を実行する前に、Flink ジョブの回復用にスナップショットを取得してください。

説明

VVR 11.0 以降でのみサポートされます。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

  • false(デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

バックフィルがスキップされた場合、スナップショットフェーズ中のテーブルの変更は、後続の増分フェーズで読み取られ、スナップショットにマージされません。

重要

バックフィルをスキップすると、スナップショットフェーズ中の変更が再再生される可能性があるため、データの不整合が発生する可能性があります。保証されるのは、最低限 1 回のセマンティクスのみです。

説明

VVR 11.1 以降でのみサポートされます。

treat-tinyint1-as-boolean.enabled

TINYINT(1) 型をブール値型として扱うかどうかを指定します。

いいえ

BOOLEAN

true

有効な値:

  • true(デフォルト):TINYINT(1) をブール値型として扱います。

  • false:TINYINT(1) をブール値型として扱いません。

treat-timestamp-as-datetime-enabled

MySQL TIMESTAMP を DATETIME として処理するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:MySQL TIMESTAMP データを DATETIME データとして処理し、CDC TIMESTAMP 型にマッピングします。

  • false(デフォルト):MySQL TIMESTAMP データを CDC TIMESTAMP_LTZ 型にマッピングします。

MySQL TIMESTAMP は UTC 時間を格納し、タイムゾーンの影響を受けます。MySQL DATETIME はリテラル時間を格納し、タイムゾーンの影響を受けません。

これを有効化すると、MySQL TIMESTAMP データは server-time-zone に基づいて DATETIME に変換されます。

include-comments.enabled

テーブルおよびカラムのコメントを同期するかどうかを指定します。

いいえ

BOOELEAN

false

有効な値:

  • true:テーブルおよびカラムのコメントを同期します。

  • false(デフォルト):テーブルおよびカラムのコメントを同期しません。

これを有効化すると、ジョブのメモリ使用量が増加します。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布するかどうかを指定します。

いいえ

BOOELEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中に、上限なしのチャンクを最初に配布します。

  • false (デフォルト): スナップショットの読み取りフェーズ中に、境界のないチャンクを最初に分配しません。

これは実験的な機能です。これを有効化すると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクが低減します。ジョブの初回起動前にこれを追加することを推奨します。

説明

VVR 11.1 以降でのみサポートされます。

binlog.session.network.timeout

binlog 接続のネットワークタイムアウト。

いいえ

DURATION

10m

これを 0s に設定すると、MySQL サーバーのデフォルトタイムアウトが使用されます。

説明

VVR 11.5 以降でのみサポートされます。

scan.rate-limit.records-per-second

ソースによって 1 秒あたりに出力される最大レコード数を制限します。

いいえ

LONG

なし

データ読み取りを制限するのに役立ちます。この制限は、完全および増分フェーズの両方に適用されます。

numRecordsOutPerSecond メトリックは、データフロー全体で 1 秒あたりに出力されるレコード数を反映します。このメトリックに基づいて、このパラメーターを調整してください。

完全読み取り中は、scan.incremental.snapshot.chunk.size の値を下げることで、1 バッチあたりの行数を減らしてください。

説明

VVR 11.5 以降でのみサポートされます。

型マッピング

以下の表に、データインジェストの型マッピングを示します。

MySQL CDC フィールド型

CDC フィールド型

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

FIXED(p, s) [UNSIGNED] [ZEROFILL] where p ≤ 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

フィールドマッピングは treat-timestamp-as-datetime-enabled オプションに依存します:

true:TIMESTAMP[(p)]

false:TIMESTAMP_LTZ[(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(⌈(n + 7) / 8⌉)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

STRING

説明

MySQL では、10 進数の精度は最大 65 まで可能です。Flink では、10 進数の精度は 38 に制限されています。精度が 38 を超える 10 進カラムを定義する場合、精度の損失を回避するために文字列にマッピングしてください。

DECIMAL(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

FIXED(p, s) [UNSIGNED] [ZEROFILL] where 38 < p ≤ 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

説明

JSON データ型は、Flink で JSON 形式の文字列に変換されます。

GEOMETRY

STRING

説明

MySQL の空間データ型は、固定の JSON 形式を持つ文字列に変換されます。詳細については、「空間データ型マッピング」をご参照ください。

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

説明

MySQL は、最大長が 2,147,483,647(2**31-1)バイトの BLOB をサポートしています。

BLOB

MEDIUMBLOB

LONGBLOB

  • CDC ソーステーブル

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • シンクテーブル

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • データインジェストソース

    source:
      type: mysql
      name: MySQL ソース
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values シンク
      print.enabled: true
      sink.print.logger: true

MySQL CDC ソーステーブルについて

  • 仕組み

    MySQL CDC ソーステーブルは、起動時にテーブル全体をスキャンし、プライマリキーに基づいて複数のチャンクに分割します。現在の binlog オフセットを記録した後、増分スナップショットアルゴリズムを使用して SELECT 文で各チャンクのデータを読み取ります。ジョブは定期的にチェックポイントを実行して、完了したチャンクを記録します。フェールオーバーが発生した場合、ジョブは未完了のチャンクのみを継続して読み取ります。すべてのチャンクの読み取りが完了した後、ジョブは以前に記録された binlog オフセットから増分変更レコードを読み取ります。Flink ジョブは引き続き定期的にチェックポイントを実行して binlog オフセットを記録します。フェールオーバーが発生した場合、ジョブは最後に記録された binlog オフセットから処理を再開します。このプロセスにより、1 回限りのセマンティクスが実現されます。

    増分スナップショットアルゴリズムの詳細については、「MySQL CDC コネクタ」をご参照ください。

  • メタデータ

    メタデータは、シャーディングされたデータベースおよびテーブルのマージシナリオで非常に有用です。マージ後も、ビジネスでは各行のデータのソースデータベースおよびテーブルを識別する必要がある場合がよくあります。メタデータカラムを使用すると、この情報をアクセスできます。そのため、メタデータカラムを使用して、複数のシャーディングされたテーブルを簡単に 1 つの宛先テーブルにマージできます。

    MySQL CDC ソースは、メタデータカラム構文をサポートしています。以下のメタデータにメタデータカラム経由でアクセスできます。

    メタデータキー

    メタデータ型

    説明

    database_name

    STRING NOT NULL

    行を含むデータベースの名前。

    table_name

    STRING NOT NULL

    行を含むテーブルの名前。

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    データベース内で行が変更された時刻。レコードが binlog ではなくテーブルの既存の履歴データに由来する場合、この値は常に 0 になります。

    op_type

    STRING NOT NULL

    行の変更タイプ。

    • +I:INSERT メッセージ

    • -D:DELETE メッセージ

    • -U:UPDATE_BEFORE メッセージ

    • +U:UPDATE_AFTER メッセージ

    説明

    Ververica Runtime(VVR)8.0.7 以降でのみサポートされます。

    query_log

    STRING NOT NULL

    この行に対応する MySQL クエリログレコードを読み取ります。

    説明

    MySQL は、クエリログを記録するために binlog_rows_query_log_events パラメーターを有効化している必要があります。

    次の例は、MySQL インスタンス内の異なるシャーディングされたデータベースの複数の orders テーブルをマージし、Hologres の holo_orders テーブルに同期する方法を示しています。

    CREATE TEMPORARY TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- データベース名を読み取ります。
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- テーブル名を読み取ります。
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 変更タイムスタンプを読み取ります。
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 変更タイプを読み取ります。
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- 正規表現を使用して複数のシャーディングされたデータベースに一致させます。
      'table-name' = 'orders_.*'   -- 正規表現を使用して複数のシャーディングされたテーブルに一致させます。
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    上記のコードに基づき、WITH 句で scan.read-changelog-as-append-only.enabled を true に設定すると、下流テーブルのプライマリキー構成に基づいて出力が異なります:

    • 下流テーブルのプライマリキーが order_id の場合、出力には上流テーブルの各プライマリキーの最新の変更のみが含まれます。たとえば、プライマリキーの最新の変更が削除操作の場合、下流テーブルには同じプライマリキーと op_type が -D のレコードが表示されます。

    • 下流テーブルのプライマリキーが order_id、operation_ts、および op_type の場合、出力には上流テーブルの各プライマリキーの完全な変更履歴が含まれます。

  • 正規表現のサポート

    MySQL CDC ソーステーブルは、テーブル名またはデータベース名に正規表現を使用して、複数のテーブルまたはデータベースに一致させることをサポートしています。次の例は、正規表現を使用して複数のテーブルを指定する方法を示しています。

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正規表現を使用して複数のデータベースに一致させます。
      'table-name' = '(t[5-8]|tt)' -- 正規表現を使用して複数のテーブルに一致させます。
    );

    上記の例の正規表現の説明:

    • ^(test).* はプレフィックス一致の例です。この式は、test1 や test2 などの test で始まるデータベース名に一致します。

    • .*[p$] はサフィックス一致の例です。この式は、cdcp や edcp などの p で終わるデータベース名に一致します。

    • txc は完全一致です。特定のデータベース名 txc に一致します。

    完全修飾テーブル名を一致させる際、MySQL CDC はデータベース名とテーブル名の両方を使用してテーブルを一意に識別します。パターン database-name.table-name を使用して一致させます。たとえば、パターン (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) は、データベース内の txc.tt や test2.test5 などのテーブルに一致します。

    重要

    SQL ジョブの構成では、table-name および database-name オプションは、複数のテーブルまたはデータベースを指定するためにカンマ(,)をサポートしていません。

    • 複数のテーブルに一致させる場合や複数の正規表現を使用する場合は、縦線(|)で区切って括弧で囲む必要があります。たとえば、user テーブルと product テーブルを読み取るには、table-name を (user|product) として構成します。

    • 正規表現にカンマが含まれている場合、縦線(|)演算子を使用して書き換える必要があります。たとえば、正規表現 mytable_\d{1, 2} は、カンマを使用しない同等の式 (mytable_\d{1}|mytable_\d{2}) に書き換える必要があります。

  • 同時実行制御

    MySQL コネクタは、完全データの同時読み取りをサポートしており、これによりデータ読み込み効率が向上します。Realtime Compute for Apache Flink コンソールの Autopilot と組み合わせることで、同時読み取りが完了した後の増分フェーズ中に自動的にスケールインし、コンピューティングリソースを節約します。

    Realtime Compute 開発コンソールでは、リソース構成ページで基本モードまたはエキスパートモードのいずれかでジョブの並列処理の次数を設定できます。違いは次のとおりです:

    • 基本モードで設定された並列処理の次数は、ジョブ全体のグローバルな並列処理の次数です。基础模式

    • エキスパートモードでは、必要に応じて特定の VERTEX の並列処理の次数を設定できます。vertex并发

    リソース構成の詳細については、「ジョブデプロイメント情報の設定」をご参照ください。

    重要

    基本モードまたはエキスパートモードのいずれを使用する場合でも、テーブルで宣言された server-id 範囲は、ジョブの並列処理の次数以上である必要があります。たとえば、server-id 範囲が 5404-5412 の場合、8 個の一意な server ID があります。したがって、ジョブは最大 8 個の並列タスクを持つことができます。同じ MySQL インスタンスに対する異なるジョブは、重複しない server-id 範囲を持つ必要があります。各ジョブは、明示的に異なる server-id を設定する必要があります。

  • Autopilot 自動スケールイン

    完全データフェーズでは、大量の履歴データが蓄積されます。読み取り効率を向上させるために、通常、履歴データは同時読み取りされます。一方、増分 binlog フェーズでは、binlog データ量が少なく、グローバルな順序付けを維持する必要があるため、通常は単一の並列処理で十分です。Autopilot は、完全フェーズと増分フェーズの間でこれらの異なる要件を満たすために、パフォーマンスとリソースのバランスを自動的に調整します。

    Autopilot は、MySQL CDC ソースの各タスクのトラフィックを監視します。binlog フェーズに移行した際に、1 つのタスクのみが binlog 読み取りを処理し、他のタスクがアイドル状態になる場合、Autopilot は自動的にソースの CU 数および並列処理の次数を削減します。Autopilot を有効化するには、ジョブの運用保守ページで Autopilot モードを Active に設定します。

    説明

    並列処理の次数をスケールダウンするためのデフォルトの最小トリガー間隔は 24 時間です。Autopilot パラメーターおよび詳細については、「Autopilot の設定」をご参照ください。

  • 起動モード

    scan.startup.mode オプションを使用して、MySQL CDC ソーステーブルの起動モードを指定できます。有効な値は次のとおりです:

    • initial(デフォルト):最初の起動時にデータベーステーブルの完全読み取りを実行し、その後増分モードに切り替えて binlog を読み取ります。

    • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古い binlog オフセットから読み取りを開始します。

    • latest-offset:スナップショットフェーズをスキップし、binlog の末尾から読み取りを開始します。このモードでは、ソーステーブルはジョブの起動後に発生した変更のみを読み取ります。

    • specific-offset:スナップショットフェーズをスキップし、指定された特定の binlog オフセットから読み取りを開始します。オフセットは、binlog ファイル名と位置、または GTID セットで指定できます。

    • timestamp:スナップショットフェーズをスキップし、指定されたタイムスタンプから binlog イベントの読み取りを開始します。

    例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 最も古いオフセットから開始します。
        'scan.startup.mode' = 'latest-offset', -- 最新のオフセットから開始します。
        'scan.startup.mode' = 'specific-offset', -- 特定のオフセットから開始します。
        'scan.startup.mode' = 'timestamp', -- 特定のタイムスタンプから開始します。
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- specific-offset モードの binlog ファイル名を指定します。
        'scan.startup.specific-offset.pos' = '4', -- specific-offset モードの binlog 位置を指定します。
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- specific-offset モードの GTID セットを指定します。
        'scan.startup.timestamp-millis' = '1667232000000' -- timestamp モードの起動タイムスタンプを指定します。
        ...
    )
    重要
    • MySQL ソースは、チェックポイント中に INFO レベルで現在の位置をログに記録します。ログのプレフィックスは Binlog offset on checkpoint {checkpoint-id} です。このログは、特定のチェックポイント位置からジョブを再起動する際に役立ちます。

    • 読み取り中のテーブルのスキーマが過去に変更されている場合、earliest-offset、specific-offset、または timestamp から開始するとエラーが発生する可能性があります。これは、Debezium リーダーが内部的に最新のスキーマを保存しており、スキーマが一致しない古いデータを正しく解析できないためです。

  • キーのない CDC ソーステーブル

    • キーのないテーブルを使用するには、scan.incremental.snapshot.chunk.key-column を設定し、NULL 不可のフィールドのみを選択する必要があります。

    • キーのない CDC ソーステーブルの処理セマンティクスは、scan.incremental.snapshot.chunk.key-column で指定されたカラムの動作に依存します:

      • 指定されたカラムが更新されない場合、1 回限りのセマンティクスが保証されます。

      • 指定されたカラムが更新される場合、最低限 1 回のセマンティクスのみが保証されます。ただし、下流システムと組み合わせて、下流のプライマリキーを指定し、べき等操作を使用することで、データの正確性を確保できます。

  • RDS MySQL からのバックアップログの読み取り

    MySQL CDC ソーステーブルは、Alibaba Cloud RDS MySQL からのバックアップログの読み取りをサポートしています。この機能は、完全スナップショットフェーズに長い時間がかかる場合に特に有用です。この場合、ローカルの binlog ファイルは自動的にクリーンアップされる可能性がありますが、手動または自動でアップロードされたバックアップファイルは依然として存在します。

    例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • CDC ソースの再利用を有効化

    複数の MySQL CDC ソーステーブルを持つ単一のジョブは、複数の binlog クライアントを起動します。すべてのソーステーブルが同じ MySQL インスタンスから読み取る場合、この方法はデータベースへの負荷を増加させます。詳細については、「MySQL CDC よくある質問」をご参照ください。

    解決策

    VVR 8.0.7 以降のバージョンでは、MySQL CDC ソースの再利用がサポートされています。再利用により、互換性のある MySQL CDC ソーステーブルがマージされます。ソーステーブルがデータベース名、テーブル名、および server-id を除いて同一の構成を共有している場合にマージが発生します。エンジンは、同一ジョブ内の MySQL CDC ソースを自動的にマージします。

    手順

    1. SQL ジョブで SET コマンドを使用できます:

      SET 'table.optimizer.source-merge.enabled' = 'true';
      
      # (VVR 8.0.8 および 8.0.9 バージョン)以下も設定します:
      SET 'sql-gateway.exec-plan.enabled' = 'false';
      VVR 11.1 以降のバージョンでは、再利用がデフォルトで有効化されています。
    2. ステートレスでジョブを開始します。 ソース再利用構成を変更すると、ジョブトポロジーが変更されます。ステートなしでジョブを開始しないと、ジョブが正常に開始されなかったり、データが失われたりする可能性があります。ソースがマージされた場合、ジョブトポロジーに MergetableSourceScan ノードが表示されます。

    重要
    • 再利用を有効化した後、pipeline.operator-chainingfalse に設定しないでください。オペレーターチェーンを無効化すると、シリアル化および逆シリアル化のオーバーヘッドが追加されます。マージされるソースが多いほど、オーバーヘッドが大きくなります。

    • VVR 8.0.7 では、オペレーターチェーンを無効化するとシリアル化の問題が発生します。

binlog 読み取りの高速化

MySQL コネクタをソーステーブルまたはデータインジェストソースとして使用する場合、増分フェーズ中に binlog ファイルを解析してさまざまな変更メッセージを生成します。binlog ファイルは、バイナリ形式でテーブルのすべての変更を記録します。binlog ファイルの解析を高速化するには、次の方法があります:

  • 解析フィルター構成を有効化

    • scan.only.deserialize.captured.tables.changelog.enabled オプションを使用して、指定されたテーブルの変更イベントのみを解析します。

  • Debezium オプションの最適化

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size:ブロッキングキューが保持できる最大レコード数。Debezium がデータベースからイベントストリームを読み取る際、イベントを下流に書き込む前にブロッキングキューに配置します。デフォルト値は 8192 です。

    • debezium.max.batch.size:1 回の反復で処理される最大イベント数。デフォルト値は 2048 です。

    • debezium.poll.interval.ms:コネクタが新しい変更イベントを要求する前に待機するミリ秒数。デフォルト値は 1000 ms(1 秒)です。

例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium 設定
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 解析フィルターを有効化
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 指定されたテーブルの変更イベントのみを解析します。
    ...
)
source:
  type: mysql
  name: MySQL ソース
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium 設定
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # 解析フィルターを有効化
  scan.only.deserialize.captured.tables.changelog.enabled: true

MySQL CDC のエンタープライズ版は、オープンソースコミュニティ版の約 2 倍である 85 MB/s の速度で binlog を消費します。binlog 生成速度が 85 MB/s(512 MB のファイルを 6 秒ごとに生成する速度に相当)を超える場合、Flink ジョブの遅延が継続的に増加します。binlog 生成速度が低下すると、遅延は徐々に減少します。binlog ファイルに大規模なトランザクションが含まれている場合、処理遅延が一時的に増加し、トランザクションのログの読み取りが完了すると減少します。

MySQL CDC DataStream API

重要

DataStream API を使用してデータの読み取りおよび書き込みを行うには、対応する DataStream コネクタを使用して Flink に接続する必要があります。DataStream コネクタの設定方法については、「DataStream コネクタの使用方法」をご参照ください。

次の例は、DataStream API プログラムを作成し、MySqlSource を使用する方法を示しています。必要な pom 依存関係も含まれています。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // キャプチャ対象のデータベースを設定します。
        .tableList("yourDatabaseName.yourTableName") // キャプチャ対象のテーブルを設定します。
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // SourceRecord を JSON 文字列に変換します。
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // チェックポイントを有効化します。
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL ソース")
      // 並列ソースタスクを 4 つに設定します。
      .setParallelism(4)
      .print().setParallelism(1); // メッセージの順序を維持するために、シンクの並列処理の次数を 1 に設定します。
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

MySqlSource をビルドする際、コード内で以下のパラメーターを指定する必要があります:

パラメーター

説明

hostname

MySQL データベースの IP アドレスまたはホスト名。

port

MySQL データベースサービスのポート番号。

databaseList

MySQL データベースの名前。

説明

データベース名は、複数のデータベースからデータを読み取るために正規表現をサポートします。.* を使用してすべてのデータベースに一致させます。

username

MySQL データベースサービスのユーザー名。

password

MySQL データベースサービスのパスワード。

deserializer

SourceRecord オブジェクトを指定された型に変換するデシリアライザー。有効な値:

  • RowDataDebeziumDeserializeSchema:SourceRecord を RowData(Flink Table または SQL の内部データ構造)に変換します。

  • JsonDebeziumDeserializationSchema:SourceRecord を JSON 形式の文字列に変換します。

pom 依存関係では、以下のパラメーターを指定する必要があります:

${vvr.version}

Alibaba Cloud Realtime Compute for Apache Flink のエンジンバージョン(例:1.17-vvr-8.0.4-3)。

説明

Maven に表示されるバージョン番号を使用してください。修正プログラムが定期的にリリースされており、これらの更新は他のチャネルを通じてアナウンスされない場合があります。

${flink.version}

Apache Flink のバージョン(例:1.17.2)。

重要

ジョブ実行時の互換性の問題を回避するために、Realtime Compute for Apache Flink エンジンバージョンに対応する Apache Flink バージョンを使用してください。バージョンマッピングの詳細については、「エンジン」をご参照ください。

よくある質問

CDC ソーステーブルを使用する際に発生する可能性のある問題については、「CDC の問題」をご参照ください。