すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PolarDB-X CDC (パブリックプレビュー)

最終更新日:Jun 05, 2026

PolarDB-X コネクタの使用方法について説明します。

背景情報

PolarDB for Xscale (PolarDB-X) は、Alibaba Cloud のパフォーマンス専有型クラウドネイティブ分散データベースであり、高スループット、大容量ストレージ、低レイテンシー、スケーラビリティ、超高可用性を提供します。

重要

このコネクタには、VVR 11.5 以降および PolarDB-X 2.0 以降が必要です。

PolarDB-X CDC コネクタはソーステーブルのみをサポートします。PolarDB-X インスタンスをディメンションテーブルとしてクエリしたり、結果テーブルとして書き込んだりするには、MySQL コネクタ (パブリックプレビュー) を使用してください。

カテゴリ

説明

サポートタイプ

ソーステーブル、データインジェストソース

実行モード

ストリーミングモードのみ

データフォーマット

N/A

特定の監視メトリクス

  • currentFetchEventTimeLag:データが生成されてからソースオペレーターがそれをフェッチするまでの時間。

    このメトリックは Binlog フェーズにのみ適用され、スナップショットフェーズでは 0 になります。

  • currentEmitEventTimeLag:データが生成されてからソースオペレーターがそれを発行するまでの時間。

    このメトリックは Binlog フェーズにのみ適用され、スナップショットフェーズでは 0 になります。

  • sourceIdleTime:ソーステーブルが最後にデータを生成してからの経過時間。

API タイプ

SQL、データインジェスト YAML ジョブ

結果テーブルのデータ更新または削除のサポート

いいえ

主な特徴

PolarDB-X CDC コネクタは、サーバーサイドフィルタリングによって Binlog 解析を最適化します。PolarDB-X サーバー上で無関係な Binlog データをトリミングすることで、スループットを向上させ、ネットワーク帯域幅を節約します。

オンデマンド Binlog サブスクリプション

サーバーサイドの Binlog フィルタリングにより、コネクタは必要な変更ログのみをクライアントに送信します。これにより、ネットワークトラフィックが削減され、データ消費のスループットが向上します。

例えば、ご利用の PolarDB-X サーバー内の db.table1 テーブルと db.table2 テーブルからの変更のみをサブスクライブするには、Flink SQL ジョブを次のように設定します:

CREATE TABLE polardbx_table_foo (
  ... -- ここでテーブルスキーマを定義します
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- その他のパラメーター
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 指定されたテーブルからの変更のみをキャプチャします
);

インスタンス全体の Binlog をロードしてクライアント側でフィルタリングする MySQL CDC コネクタとは異なり、PolarDB-X CDC コネクタはサーバーサイドでフィルタリングを実行します。この機能により、クライアントはオンデマンドでサブスクリプションが可能になり、ネットワーク I/O のオーバーヘッドが大幅に削減されます。

制限事項

サーバーサイドの Binlog フィルタリングとテーブルレベルのサブスクリプションには、PolarDB-X サーバーバージョン 2.5.0 以降および Log Service コンポーネントバージョン 5.4.20 以降が必要です。

SQL

構文

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH パラメーター

パラメーター

説明

必須

デフォルト

備考

connector

コネクタの名前。

STRING

はい

なし

値は polardbx-cdc である必要があります。

hostname

PolarDB-X データベースの IP アドレスまたはホスト名。

STRING

はい

なし

インスタンスの接続情報からクラスターアドレスを使用します。

port

PolarDB-X データベースのポート番号。

INTEGER

いいえ

3306

なし

username

PolarDB-X データベースのユーザー名。

STRING

はい

なし

なし

password

PolarDB-X データベースのパスワード。

STRING

はい

なし

なし

database-name

PolarDB-X データベースの名前。

STRING

はい

なし

正規表現を使用して、複数のデータベースからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の先頭と末尾に一致させるために ^ および $ 文字を使用しないでください。

table-name

PolarDB-X テーブルの名前。

STRING

はい

なし

正規表現を使用して、複数のテーブルからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の先頭と末尾に一致させるために ^ および $ 文字を使用しないでください。

server-time-zone

データベース接続のセッションタイムゾーン。

STRING

いいえ

ジョブの実行環境のタイムゾーン。

Asia/Shanghai などの IANA タイムゾーン識別子を指定します。このパラメーターは、ソーステーブルの TIMESTAMP 型が STRING にどのように変換されるかを制御します。

scan.incremental.snapshot.chunk.size

増分スナップショット読み取りにおける各チャンクの行数。

INTEGER

いいえ

8096

スナップショットフェーズ中、コネクタはテーブルをチャンクに分割し、メモリにキャッシュします。チャンクサイズを小さくすると、チャンクの総数が増加します。これにより、フォールトトレランスの粒度は向上しますが、メモリ不足 (OOM) エラーのリスクが高まり、スループットが低下する可能性もあります。パフォーマンス、フォールトトレランス、メモリ消費量のバランスを取るために、適切な値を設定してください。

scan.snapshot.fetch.size

テーブルのスナップショットを読み取る際に一度にフェッチする最大行数。

INTEGER

いいえ

1024

なし

connect.timeout

PolarDB-X データベースへの接続を確立するためのタイムアウト。

DURATION

いいえ

30s

なし

connection.pool.size

データベース接続プールのサイズ。

INTEGER

いいえ

20

データベース接続プールは接続を再利用して、接続のオーバーヘッドを削減します。

connect.max-retries

PolarDB-X データベースへの接続試行が失敗した後の最大リトライ回数。

INTEGER

いいえ

3

なし

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

initial

有効な値:

  • initial (デフォルト):最初の起動時に、コネクタはテーブルの完全なスナップショットを取得し、その後 Binlog から増分変更を読み取ります。

  • latest-offset:スナップショットを実行しません。最初の起動時に、コネクタは Binlog の末尾から読み取りを開始し、ジョブの開始後に行われた変更のみをキャプチャします。

  • earliest-offset:スナップショットを実行しません。利用可能な最も古い Binlog オフセットから読み取りを開始します。

  • specific-offset:スナップショットを実行しません。特定のオフセットから読み取りを開始します。オフセットは scan.startup.specific-offset.filescan.startup.specific-offset.pos を使用するか、scan.startup.specific-offset.gtid-set を使用して指定します。

  • timestamp:スナップショットを実行しません。指定されたタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis を使用してミリ秒単位で指定します。

重要

earliest-offsetspecific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定されたオフセットのスキーマと一致しない場合、ジョブは失敗します。指定された Binlog オフセットとジョブの起動の間にテーブルスキーマが変更されないようにしてください。

scan.startup.specific-offset.file

scan.startup.modespecific-offset の場合に読み取りを開始する Binlog ファイルの名前。

STRING

いいえ

なし

このパラメーターは、scan.startup.modespecific-offset に設定されている場合にのみ使用されます。例:mysql-bin.000003

scan.startup.specific-offset.pos

scan.startup.modespecific-offset に設定されている場合の、指定された Binlog ファイル内のオフセット。

INTEGER

いいえ

なし

このパラメーターは、scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

scan.startup.specific-offset.gtid-set

scan.startup.modespecific-offset に設定されている場合に読み取りを開始する GTID セット。

STRING

いいえ

なし

このパラメーターは、scan.startup.modespecific-offset に設定されている場合にのみ使用されます。例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

scan.startup.modetimestamp に設定されている場合の、ミリ秒単位の起動タイムスタンプ。

LONG

いいえ

なし

このパラメーターは、scan.startup.modetimestamp に設定されている場合にのみ使用されます。単位はミリ秒です。

scan.startup.specific-offset.skip-events

特定のオフセットから読み取る際にスキップする Binlog イベントの数。

INTEGER

いいえ

なし

このパラメーターは、scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

scan.startup.specific-offset.skip-rows

特定のオフセットから読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれる場合があります。

INTEGER

いいえ

なし

このパラメーターは、scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

heartbeat.interval

ソースが Binlog オフセットを進めるためにハートビートイベントを送信する間隔。

DURATION

いいえ

なし

ハートビートイベントは、ソースの Binlog オフセットを進めます。これにより、アイドル状態のソースで Binlog がパージされるのを防ぎます。Binlog がパージされると、ジョブは失敗し、回復にはステートレス再起動が必要になります。

chunk-meta.group.size

チャンクメタデータのサイズ。

INTEGER

いいえ

1000

メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。

chunk-key.even-distribution.factor.upper-bound

均等パーティショニングのためのチャンク分布係数の上限。

DOUBLE

いいえ

1000.0

分布係数がこの値より大きい場合、不均等チャンキングが使用されます。

チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / 総行数。

chunk-key.even-distribution.factor.lower-bound

均等パーティショニングのためのチャンク分布係数の下限。

DOUBLE

いいえ

0.05

分布係数がこの値より小さい場合、不均等チャンキングが使用されます。

チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / 総行数。

scan.newly-added-table.enabled

チェックポイントから再起動する際に、キャプチャ基準に一致する新しく追加されたテーブルをスキャンするかどうかを指定します。

BOOLEAN

いいえ

false

有効にすると、チェックポイントまたはセーブポイントから再起動されたジョブは、「table-name」パターンに一致する新しく追加されたテーブルをスキャンして同期します。また、一致しなくなったテーブルの追跡を停止します。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中のデータチャンキングに使用する列を指定します。

STRING

備考を参照

なし

  • プライマリキーのないテーブルでは必須です。指定された列は NULL 不可でなければなりません。

  • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみがサポートされます。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが完了した後にアイドルリーダーを閉じるかどうかを指定します。

BOOLEAN

いいえ

false

この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue に設定する必要があります。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルプロセスをスキップするかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルプロセスをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルプロセスをスキップしません。

バックフィルがスキップされた場合、スナップショットフェーズ中にテーブルに加えられた変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショット中に発生した変更が増分フェーズで再実行されるため、データ不整合が発生する可能性があります。このモードは、at-least-once セマンティクスのみを保証します。

scan.parse.online.schema.changes.enabled

増分フェーズ中に RDS のロックフリー変更に対する DDL イベントの解析を試みるかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:RDS のロックフリー変更に対する DDL イベントを解析します。

  • false (デフォルト):RDS のロックフリー変更に対する DDL イベントを解析しません。

これは実験的な機能です。本番環境でロックフリーのスキーマ変更を実行する前に、Flink ジョブのセーブポイントを作成して、回復できるようにしてください。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたキャプチャ対象テーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

BOOLEAN

いいえ

true

有効な値:

  • true (デフォルト):対象テーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化できます。

  • false:すべてのテーブルの変更データを逆シリアル化します。

scan.read-changelog-as-append-only.enabled

変更ログストリームを追加専用ストリームに変換するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:すべてのメッセージタイプ (INSERTDELETEUPDATE_BEFOREUPDATE_AFTER を含む) を INSERT メッセージに変換します。このオプションは、ソースから削除された行のレコードを保持するなど、特定のケースでのみ使用してください。

  • false (デフォルト):すべてのメッセージタイプはそのまま下流に送信されます。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に複数のスレッドを使用して変更イベントを逆シリアル化するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:Binlog イベントの順序を維持しながら、複数のスレッドを使用して逆シリアル化を行い、読み取りを高速化します。

  • false (デフォルト):単一のスレッドを使用して逆シリアル化を行います。

scan.parallel-deserialize-changelog.handler.size

変更イベントを並列で逆シリアル化する際に使用するイベントハンドラの数。

INTEGER

いいえ

2

なし

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に無制限チャンクを最初に配布するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズ中に無制限チャンクを最初に配布します。

  • false (デフォルト):スナップショット読み取りフェーズ中に無制限チャンクを最初に配布しません。

これは実験的な機能です。有効にすると、スナップショットの最後のチャンクを同期する際の TaskManager でのメモリ不足 (OOM) エラーのリスクを軽減できます。この設定は、ジョブの最初の起動前に設定する必要があります。

polardbx.binlog.ignore.archive-events.enabled

PolarDB-X Binlog 内のアーカイブイベント (主に DELETE イベント) を無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.ignore.query-events.enabled

PolarDB-X Binlog 内の ROWS_QUERY_LOG_EVENT イベントを無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.include.tables

指定されたテーブルからのみ Binlog イベントを読み取ります。複数のテーブル名を区切るにはコンマ (,) を使用します。

STRING

いいえ

なし

polardbx.binlog.exclude.tables

指定されたテーブルからの Binlog イベントを無視します。複数のテーブル名を区切るにはコンマ (,) を使用します。

STRING

いいえ

なし

型マッピング

PolarDB-X の型

Flink の型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP_LTZ [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB

データインジェスト

Realtime Compute for Apache Flink 11.6 は、データインジェスト YAML ジョブのデータソースとして PolarDB-X コネクタをサポートするようになりました。

構文

source:
   type: polardbx
   name: PolarDB-X Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: pdb.order_table
   # Binlog 内のアーカイブイベントを無視
   polardbx.binlog.ignore.archive-events.enabled: true
   # Binlog 内のクエリイベントを無視
   polardbx.binlog.ignore.query-events.enabled: true
   # 帯域幅を節約するために、pdb.order_table の Binlog のみをサブスクライブ
   polardbx.binlog.include.tables: pdb.order_table

sink:
  type: values

パラメーター

パラメーター

説明

必須

デフォルト値

備考

type

データソースのタイプ。

はい

STRING

N/A

値は polardbx である必要があります。

name

データソースの名前。

いいえ

STRING

N/A

N/A

hostname

PolarDB-X インスタンスの IP アドレスまたはホスト名。

はい

STRING

N/A

VPC アドレスを推奨します。

説明

ご利用の PolarDB-X インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC にない場合は、VPC 間接続を確立するか、パブリックネットワーク経由でインスタンスにアクセスする必要があります。詳細については、「ワークスペースの管理と運用」および「フルマネージド Flink クラスターがパブリックネットワークにアクセスする方法」をご参照ください。

username

PolarDB-X データベースに接続するためのユーザー名。

はい

STRING

N/A

N/A

password

指定されたユーザー名のパスワード。

はい

STRING

N/A

N/A

tables

同期する PolarDB-X テーブル。

はい

STRING

N/A

  • 正規表現をサポートし、複数のテーブルからデータを読み取ることができます。

  • 複数の正規表現をコンマ (,) で区切ることができます。

説明
  • 正規表現では、開始アンカー ^ と終了アンカー $ を使用しないでください。

  • データベース名とテーブル名はピリオド (.) で区切られます。リテラルなピリオドに一致させるには、バックスラッシュでエスケープする必要があります。例:db0.\.*db1.user_table_[0-9]+、または db[1-2].[app|web]order_\.*

tables.exclude

同期から除外するテーブル。

いいえ

STRING

N/A

  • 正規表現をサポートし、複数のテーブルを除外できます。

  • 複数の正規表現をコンマ (,) で区切ることができます。

説明

データベース名とテーブル名はピリオド (.) で区切られます。リテラルなピリオドに一致させるには、バックスラッシュでエスケープする必要があります。例:db0.\.*db1.user_table_[0-9]+、または db[1-2].[app|web]order_\.*

port

PolarDB-X インスタンスのポート。

いいえ

INTEGER

3306

N/A

schema-change.enabled

スキーマ変更イベントを発行するかどうかを指定します。

いいえ

BOOLEAN

true

N/A

jdbc.properties.*

JDBC URL のカスタム接続パラメーター。

いいえ

STRING

N/A

カスタム接続パラメーターを渡すことができます。例えば、SSL を無効にするには、'jdbc.properties.useSSL' = 'false' を設定します。

debezium.*

Binlog を読み取るためのカスタム Debezium パラメーター。

いいえ

STRING

N/A

カスタム Debezium パラメーターを渡すことができます。例えば、'debezium.event.deserialization.failure.handling.mode' = 'ignore' を設定して、コネクタが逆シリアル化エラーをどのように処理するかを定義します。

scan.incremental.snapshot.chunk.size

各チャンクのサイズ (行数)。

いいえ

INTEGER

8096

PolarDB-X テーブルは、読み取りのために複数のチャンクに分割されます。各チャンクのデータは、完全に読み取られるまでメモリにキャッシュされます。

チャンクサイズを小さくすると、チャンクの総数が増加し、障害回復の粒度が細かくなりますが、OOM のリスクが高まり、全体のスループットが低下する可能性があります。これらのトレードオフのバランスを取り、適切なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

完全なテーブルスナップショット中に一度にフェッチされる最大レコード数。

いいえ

INTEGER

1024

N/A

scan.startup.mode

データを消費するための起動モード。

いいえ

STRING

initial

有効な値:

  • initial (デフォルト):テーブルの初期スナップショットを取得し、その後 Binlog から最新の変更を読み取ります。

  • latest-offset:スナップショットフェーズをスキップし、最新の位置から Binlog を読み取り、コネクタの起動後に行われた変更のみをキャプチャします。

  • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古い Binlog の位置から読み取りを開始します。

  • specific-offset:スナップショットフェーズをスキップし、特定の位置から読み取りを開始します。位置は scan.startup.specific-offset.filescan.startup.specific-offset.pos を使用するか、scan.startup.specific-offset.gtid-set を使用して指定します。

  • timestamp:スナップショットフェーズをスキップし、scan.startup.timestamp-millis パラメーターでミリ秒単位で指定された特定のタイムスタンプから Binlog の読み取りを開始します。

重要

earliest-offsetspecific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定された起動位置のテーブルスキーマと異なる場合、ジョブはエラーで失敗します。つまり、これら 3 つの起動モードを使用する場合、対応するテーブルのスキーマが、指定された Binlog 消費位置とジョブの起動時間の間に変更されないようにする必要があります。

scan.startup.specific-offset.file

起動位置の Binlog ファイル名。

いいえ

STRING

N/A

scan.startup.modespecific-offset に設定されている場合に必須です。フォーマット例:mysql-bin.000003

scan.startup.specific-offset.pos

指定された Binlog ファイル内で読み取りを開始するオフセット。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に必須です。

scan.startup.specific-offset.gtid-set

起動位置の GTID セット。

いいえ

STRING

N/A

scan.startup.modespecific-offset に設定されている場合に使用します。GTID セットのフォーマット例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

起動位置のタイムスタンプ (ミリ秒単位)。

いいえ

LONG

N/A

scan.startup.modetimestamp に設定されている場合に必須です。

重要

タイムスタンプを指定すると、コネクタは各 Binlog ファイルの最初のイベントを読み取ってタイムスタンプを確認し、正しい開始ファイルを見つけます。指定されたタイムスタンプの対象となる Binlog ファイルがデータベースからパージされておらず、読み取り可能であることを確認してください。

server-time-zone

データベースが使用するセッションタイムゾーン。

いいえ

STRING

Flink ジョブの実行環境のタイムゾーン (ご利用のワークスペースのアベイラビリティゾーン)。

例:Asia/Shanghai。このパラメーターは、PolarDB-X の TIMESTAMP 型が STRING にどのように変換されるかを制御します。詳細については、「Debezium の時間型に関するドキュメント」をご参照ください。

scan.startup.specific-offset.skip-events

特定の位置から読み取る際にスキップする Binlog イベントの数。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に使用します。

scan.startup.specific-offset.skip-rows

特定の位置から読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれる場合があります。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に使用します。

connect.timeout

データベース接続をリトライする前に待機する最大時間。

いいえ

DURATION

30s

N/A

connect.max-retries

失敗したデータベース接続の最大リトライ回数。

いいえ

INTEGER

3

N/A

connection.pool.size

データベース接続プールのサイズ。

いいえ

INTEGER

20

接続プールは接続を再利用して、接続のオーバーヘッドを削減します。

heartbeat.interval

ソースが Binlog の位置を進めるためにハートビートイベントを送信する間隔。

いいえ

DURATION

30s

ハートビートイベントは、更新頻度の低いテーブルの Binlog の位置を進めます。これがないと、Binlog の位置が進まず、Binlog の有効期限が切れる可能性があります。Binlog の有効期限が切れるとジョブは失敗し、回復にはステートレス再起動が必要になります。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にチャンクの分割キーとして使用する列を指定します。

いいえ

STRING

N/A

プライマリキーから 1 つの列のみを選択できます。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

INTEGER

1000

メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。

chunk-key.even-distribution.factor.lower-bound

均等チャンキングを使用するかどうかを決定するためのチャンク分布係数の下限。

いいえ

DOUBLE

0.05

分布係数がこの値より小さい場合、不均等チャンキングが使用されます。

チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / データ行の総数。

chunk-key.even-distribution.factor.upper-bound

均等チャンキングを使用するかどうかを決定するためのチャンク分布係数の上限。

いいえ

DOUBLE

1000.0

分布係数がこの値より大きい場合、不均等チャンキングが使用されます。

チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / データ行の総数。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが完了した後にアイドルリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue に設定する必要があります。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

いいえ

BOOLEAN

true

有効な値:

  • true:対象テーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

  • false:すべてのテーブルの変更データを逆シリアル化します。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に複数のスレッドを使用して変更イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:Binlog イベントの順序を維持しながら、複数のスレッドを使用して変更イベントの逆シリアル化を行い、読み取りを高速化します。

  • false (デフォルト):単一のスレッドを使用してイベントの逆シリアル化を行います。

scan.parallel-deserialize-changelog.handler.size

変更イベントを並列で解析する際に使用するイベントハンドラの数。

いいえ

INTEGER

2

N/A

metadata-column.include-list

下流のシンクに渡すメタデータ列。

いいえ

STRING

N/A

利用可能なメタデータには op_tses_tsquery_logfile、および pos が含まれます。複数のメタデータ列をコンマで区切ることができます。

説明

PolarDB-X CDC YAML コネクタは、データベース名、テーブル名、または op_type のメタデータ列の追加を要求またはサポートしません。変換式で __data_event_type__ を使用して変更データの型を取得したり、__schema_name____table_name__ を使用してデータベース名とテーブル名を取得したりできます。

重要

file 列には Binlog ファイル名 (増分フェーズ) または空の文字列 (スナップショットフェーズ) が含まれます。pos 列には Binlog オフセット (増分フェーズ) または 0 (スナップショットフェーズ) が含まれます。

scan.newly-added-table.enabled

チェックポイントまたはセーブポイントから再起動する際に、テーブルパターンに一致する新しく追加されたテーブルを同期し、一致しなくなったテーブルの追跡を停止するかどうかを指定します。

いいえ

BOOLEAN

false

この設定は、チェックポイントまたはセーブポイントから再起動する場合にのみ有効です。

scan.binlog.newly-added-table.enabled

増分フェーズ中に、新しく一致したテーブルのデータを送信するかどうかを指定します。

いいえ

BOOLEAN

false

scan.newly-added-table.enabled と同時に有効にすることはできません。

scan.incremental.snapshot.chunk.key-column

特定のテーブルのスナップショットフェーズ中にチャンキングするための分割キー列を指定します。

いいえ

STRING

N/A

  • テーブル名と列名をコロン (:) で接続してルールを定義します。テーブル名は正規表現にすることができます。複数のルールをセミコロン (;) で区切って定義できます。例:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • このパラメーターはプライマリキーのないテーブルでは必須であり、選択された列は非 NULL (NOT NULL) 型でなければなりません。プライマリキーのあるテーブルでは、このパラメーターはオプションであり、プライマリキーから 1 つの列のみを選択できます。

scan.parse.online.schema.changes.enabled

増分フェーズ中に、ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析します。

  • false (デフォルト):ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析しません。

これは実験的な機能です。オンラインでロックフリーの変更を実行する前に、Flink ジョブのセーブポイントを作成して、必要に応じて復元できるようにすることを推奨します。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

バックフィルがスキップされた場合、スナップショットフェーズ中に発生するテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショットフェーズからの変更が再実行される可能性があるため、データの不整合が発生する可能性があります。このモードは at-least-once セマンティクスを提供します。

treat-tinyint1-as-boolean.enabled

TINYINT(1) データ型を BOOLEAN として扱うかどうかを指定します。

いいえ

BOOLEAN

true

有効な値:

  • true (デフォルト):TINYINT(1) 型を BOOLEAN として扱います。

  • falseTINYINT(1) 型を BOOLEAN として扱いません。

treat-timestamp-as-datetime-enabled

TIMESTAMP データ型を DATETIME として扱うかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:PolarDB-X の TIMESTAMP 型を DATETIME として扱い、CDC の TIMESTAMP 型にマッピングします。

  • false (デフォルト):PolarDB-X の TIMESTAMP 型を CDC の TIMESTAMP_LTZ 型にマッピングします。

PolarDB-X の TIMESTAMP 型は UTC 時間を格納し、タイムゾーンの影響を受けます。PolarDB-X の DATETIME 型はリテラルな時間を格納し、タイムゾーンの影響を受けません。

有効にすると、このパラメーターは server-time-zone に基づいて PolarDB-X の TIMESTAMP データを DATETIME 型に変換します。

include-comments.enabled

テーブルと列のコメントを同期するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:テーブルと列のコメントを同期します。

  • false (デフォルト):テーブルと列のコメントを同期しません。

このパラメーターを有効にすると、ジョブのメモリ使用量が増加します。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に無制限チャンクを最初にディスパッチするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中に無制限チャンクを最初にディスパッチします。

  • false (デフォルト):スナップショット読み取りフェーズ中に無制限チャンクを最初にディスパッチしません。

これは実験的な機能です。有効にすると、最後のチャンクが同期される際の TaskManager での OOM エラーのリスクを軽減できます。ジョブの最初の起動前にこのパラメーターを追加することを推奨します。

binlog.session.network.timeout

Binlog 接続のネットワークタイムアウト。

いいえ

DURATION

10m

0s に設定すると、サーバー側のデフォルトのタイムアウトが使用されます。

scan.rate-limit.records-per-second

ソースが 1 秒あたりに出力できる最大レコード数。

いいえ

LONG

N/A

このパラメーターを使用して、データ読み取りのスループットを制限します。この制限は、スナップショットフェーズと増分フェーズの両方に適用されます。

ソースの numRecordsOutPerSecond メトリックは、データストリーム全体で毎秒出力されるレコード数を示します。このメトリックを使用して、このパラメーターを調整できます。

スナップショット読み取りフェーズ中に、scan.incremental.snapshot.chunk.size パラメーターの値を減らすことで、バッチごとに読み取る行数を減らす必要がある場合もあります。

include-binlog-meta.enable

GTID や Binlog の位置など、生の PolarDB-X Binlog メタデータを出力レコードに含めるかどうかを指定します。

いいえ

BOOLEAN

false

既存の Canal ベースの同期パイプラインを置き換えるなど、生の Binlog 同期シナリオに適用できます。

polardbx.binlog.ignore.archive-events.enabled

PolarDB-X Binlog 内のアーカイブイベント (主に DELETE イベント) を無視するかどうかを指定します。

いいえ

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

PolarDB-X Binlog 内の ROWS_QUERY_LOG_EVENT イベントを無視するかどうかを指定します。

いいえ

BOOLEAN

false

polardbx.binlog.include.tables

Binlog イベントをサブスクライブするテーブルのコンマ区切りリスト。

いいえ

STRING

N/A

説明

このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。

polardbx.binlog.exclude.tables

Binlog イベントを無視するテーブルのコンマ区切りリスト。

いいえ

STRING

N/A

説明

このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。