すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PolarDB-X CDC (パブリックプレビュー)

最終更新日:Apr 10, 2026

PolarDB-X コネクタの使用方法について説明します。

背景情報

PolarDB for Xscale (PolarDB-X) は、Alibaba Cloud のパフォーマンス専有型、クラウドネイティブな分散データベースであり、高スループット、大容量ストレージ、低レイテンシー、スケーラビリティ、および超高可用性を提供します。

重要

このコネクタは、VVR 11.5 以降および PolarDB-X 2.0 以降が必要です。

PolarDB-X CDC コネクタはソーステーブルのみをサポートします。PolarDB-X インスタンスをディメンションテーブルとしてクエリしたり、結果テーブルとして書き込んだりするには、MySQL コネクタ (パブリックプレビュー) をご使用ください。

カテゴリ

説明

サポートタイプ

ソーステーブル

実行モード

ストリーミングモードのみ

データフォーマット

N/A

特定の監視メトリクス

  • currentFetchEventTimeLag:データが生成されてから Source Operator がそれをフェッチするまでの時間。

    このメトリックは Binlog フェーズにのみ適用され、スナップショットフェーズでは 0 になります。

  • currentEmitEventTimeLag:データが生成されてから Source Operator がそれを発行するまでの時間。

    このメトリックは Binlog フェーズにのみ適用され、スナップショットフェーズでは 0 になります。

  • sourceIdleTime:ソーステーブルが最後にデータを生成してからの経過時間。

API タイプ

SQL

結果テーブルのデータの更新または削除のサポート

いいえ

主な特徴

PolarDB-X CDC コネクタは、サーバー側フィルタリングによって Binlog の解析を最適化します。PolarDB-X サーバー上で関連性のない Binlog データをトリミングすることで、スループットを向上させ、ネットワーク帯域幅を節約します。

オンデマンドの Binlog サブスクリプション

サーバー側の Binlog フィルタリングにより、コネクタは必要な変更ログのみをクライアントに送信します。これにより、ネットワークトラフィックが削減され、データ消費のスループットが向上します。

例えば、PolarDB-X サーバー内の db.table1db.table2 テーブルからの変更のみをサブスクライブするには、Flink SQL ジョブを次のように設定します。

CREATE TABLE polardbx_table_foo (
  ... -- ここでテーブルスキーマを定義します
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- その他のパラメーター
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 指定されたテーブルからの変更のみをキャプチャします
);

クライアント側でのフィルタリングのためにインスタンス全体の Binlog をロードする MySQL CDC コネクタとは異なり、PolarDB-X CDC コネクタはサーバー側でフィルタリングを実行します。この機能により、オンデマンドのクライアントサブスクリプションが可能になり、ネットワーク I/O オーバーヘッドが大幅に削減されます。

制限事項

サーバー側の Binlog フィルタリングとテーブルレベルのサブスクリプションには、PolarDB-X サーバーバージョン 2.5.0 以降および Log Service コンポーネントバージョン 5.4.20 以降が必要です。

SQL

構文

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH パラメーター

パラメーター

説明

必須

デフォルト

備考

connector

コネクタの名前。

STRING

はい

なし

値は polardbx-cdc である必要があります。

hostname

PolarDB-X データベースの IP アドレスまたはホスト名。

STRING

はい

なし

インスタンスの接続情報からクラスターアドレスを使用します。

port

PolarDB-X データベースのポート番号。

INTEGER

いいえ

3306

なし

username

PolarDB-X データベースのユーザー名。

STRING

はい

なし

なし

password

PolarDB-X データベースのパスワード。

STRING

はい

なし

なし

database-name

PolarDB-X データベースの名前。

STRING

はい

なし

正規表現を使用して、複数のデータベースからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の開始と終了に一致させるために ^$ の文字を使用しないでください。

table-name

PolarDB-X テーブルの名前。

STRING

はい

なし

正規表現を使用して、複数のテーブルからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の開始と終了に一致させるために ^$ の文字を使用しないでください。

server-time-zone

データベース接続のセッションタイムゾーン。

STRING

いいえ

ジョブの実行環境のタイムゾーン。

Asia/Shanghai などの IANA タイムゾーン識別子を指定します。このパラメーターは、ソーステーブルの TIMESTAMP 型が STRING にどのように変換されるかを制御します。

scan.incremental.snapshot.chunk.size

増分スナップショット読み取りにおける各チャンクの行数。

INTEGER

いいえ

8096

スナップショットフェーズ中、コネクタはテーブルをチャンクに分割し、メモリにキャッシュします。チャンクサイズを小さくすると、チャンクの総数が増加します。これにより、フォールトトレランスの粒度は向上しますが、メモリ不足 (OOM) エラーのリスクが高まり、スループットが低下します。パフォーマンス、フォールトトレランス、メモリ消費のバランスを取るために、適切な値を設定してください。

scan.snapshot.fetch.size

テーブルのスナップショットを読み取る際に一度にフェッチする最大行数。

INTEGER

いいえ

1024

なし

connect.timeout

PolarDB-X データベースへの接続を確立するためのタイムアウト。

DURATION

いいえ

30s

なし

connection.pool.size

データベース接続プールのサイズ。

INTEGER

いいえ

20

データベース接続プールは接続を再利用して、接続のオーバーヘッドを削減します。

connect.max-retries

PolarDB-X データベースへの接続試行が失敗した後の最大リトライ回数。

INTEGER

いいえ

3

なし

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

initial

有効な値:

  • initial (デフォルト):初回起動時、コネクタはテーブルの完全なスナップショットを取得し、その後 Binlog から増分変更を読み取ります。

  • latest-offset:スナップショットを実行しません。初回起動時、コネクタは Binlog の末尾から読み取りを開始し、ジョブ開始後に行われた変更のみをキャプチャします。

  • earliest-offset:スナップショットを一切実行しません。利用可能な最も古い Binlog オフセットから読み取りを開始します。

  • specific-offset:スナップショットを一切実行しません。特定のオフセットから読み取りを開始します。オフセットは scan.startup.specific-offset.filescan.startup.specific-offset.pos を使用するか、scan.startup.specific-offset.gtid-set を使用して指定します。

  • timestamp:スナップショットを一切実行しません。指定されたタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis を使用してミリ秒単位で指定します。

重要

earliest-offsetspecific-offset、および timestamp の起動モードでは、起動時のテーブルスキーマが指定されたオフセットのスキーマと一致しない場合、ジョブは失敗します。指定された Binlog オフセットとジョブの起動の間にテーブルスキーマが変更されないようにしてください。

scan.startup.specific-offset.file

scan.startup.modespecific-offset の場合に開始する Binlog ファイルの名前。

STRING

いいえ

なし

このパラメーターは scan.startup.modespecific-offset に設定されている場合にのみ使用されます。例:mysql-bin.000003

scan.startup.specific-offset.pos

scan.startup.modespecific-offset に設定されている場合の、指定された Binlog ファイル内のオフセット。

INTEGER

いいえ

なし

このパラメーターは scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

scan.startup.specific-offset.gtid-set

scan.startup.modespecific-offset に設定されている場合に開始する GTID セット。

STRING

いいえ

なし

このパラメーターは scan.startup.modespecific-offset に設定されている場合にのみ使用されます。例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

scan.startup.modetimestamp に設定されている場合の、ミリ秒単位の起動タイムスタンプ。

LONG

いいえ

なし

このパラメーターは scan.startup.modetimestamp に設定されている場合にのみ使用されます。単位はミリ秒です。

scan.startup.specific-offset.skip-events

特定のオフセットから読み取る際にスキップする Binlog イベントの数。

INTEGER

いいえ

なし

このパラメーターは scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

scan.startup.specific-offset.skip-rows

特定のオフセットから読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれることがあります。

INTEGER

いいえ

なし

このパラメーターは scan.startup.modespecific-offset に設定されている場合にのみ使用されます。

heartbeat.interval

ソースが Binlog オフセットを進めるためにハートビートイベントを送信する間隔。

DURATION

いいえ

なし

ハートビートイベントはソースの Binlog オフセットを進めます。これにより、アイドルのソースで Binlog がパージされるのを防ぎます。Binlog がパージされると、ジョブは失敗し、回復するにはステートレス再起動が必要になります。

chunk-meta.group.size

チャンクメタデータのサイズ。

INTEGER

いいえ

1000

メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。

chunk-key.even-distribution.factor.upper-bound

均等パーティショニングのためのチャンクディストリビューションファクターの上限。

DOUBLE

いいえ

1000.0

ディストリビューションファクターがこの値より大きい場合、不均等なチャンク分割が使用されます。

チャンクディストリビューションファクター = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。

chunk-key.even-distribution.factor.lower-bound

均等パーティショニングのためのチャンクディストリビューションファクターの下限。

DOUBLE

いいえ

0.05

ディストリビューションファクターがこの値より小さい場合、不均等なチャンク分割が使用されます。

チャンクディストリビューションファクター = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。

scan.newly-added-table.enabled

チェックポイントから再起動する際に、キャプチャ条件に一致する新しく追加されたテーブルをスキャンするかどうかを指定します。

BOOLEAN

いいえ

false

有効にすると、チェックポイントまたはセーブポイントから再起動されたジョブは、「table-name」パターンに一致する新しく追加されたテーブルをスキャンして同期します。また、一致しなくなったテーブルの追跡を停止します。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中のデータチャンク分割に使用する列を指定します。

STRING

備考を参照

なし

  • プライマリキーのないテーブルでは必須です。指定された列は NULL 不可でなければなりません。

  • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみがサポートされます。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

BOOLEAN

いいえ

false

この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue に設定する必要があります。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルプロセスをスキップするかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルプロセスをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルプロセスをスキップしません。

バックフィルがスキップされた場合、スナップショットフェーズ中にテーブルに加えられた変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショット中に発生した変更が増分フェーズで再実行されるため、データの一貫性が損なわれる可能性があります。このモードは at-least-once セマンティクスのみを保証します。

scan.parse.online.schema.changes.enabled

増分フェーズ中に RDS ロックフリー変更の DDL イベントを解析しようとするかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:RDS ロックフリー変更の DDL イベントを解析します。

  • false (デフォルト):RDS ロックフリー変更の DDL イベントを解析しません。

これは実験的な機能です。本番環境でロックフリーのスキーマ変更を実行する前に、Flink ジョブのセーブポイントを作成して、回復できるようにしてください。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたキャプチャ対象テーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

BOOLEAN

いいえ

true

有効な値:

  • true (デフォルト):対象テーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化できます。

  • false:すべてのテーブルの変更データを逆シリアル化します。

scan.read-changelog-as-append-only.enabled

チェンジログストリームを追加専用ストリームに変換するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:すべてのメッセージタイプ (INSERTDELETEUPDATE_BEFOREUPDATE_AFTER を含む) を INSERT メッセージに変換します。このオプションは、ソースから削除された行のレコードを保持するなど、特定のケースでのみ使用してください。

  • false (デフォルト):すべてのメッセージタイプはそのままダウンストリームに送信されます。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に複数のスレッドを使用して変更イベントを逆シリアル化するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:複数のスレッドを使用して逆シリアル化を行い、Binlog イベントの順序を維持しながら読み取りを高速化します。

  • false (デフォルト):単一のスレッドを使用して逆シリアル化します。

scan.parallel-deserialize-changelog.handler.size

変更イベントを並行して逆シリアル化する際に使用するイベントハンドラの数。

INTEGER

いいえ

2

なし

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、有界でないチャンクを最初に分配するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズ中に、有界でないチャンクを最初に分配します。

  • false (デフォルト):スナップショット読み取りフェーズ中に、有界でないチャンクを最初に分配しません。

これは実験的な機能です。有効にすると、スナップショットの最後のチャンクを同期する際の TaskManager でのメモリ不足 (OOM) エラーのリスクを軽減できます。この設定は、ジョブの初回起動前に設定する必要があります。

polardbx.binlog.ignore.archive-events.enabled

PolarDB-X Binlog 内のアーカイブイベント (主に DELETE イベント) を無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.ignore.query-events.enabled

PolarDB-X Binlog 内の ROWS_QUERY_LOG_EVENT イベントを無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.include.tables

指定されたテーブルからのみ Binlog イベントを読み取ります。複数のテーブル名はカンマ (,) で区切ります。

STRING

いいえ

なし

polardbx.binlog.exclude.tables

指定されたテーブルからの Binlog イベントを無視します。複数のテーブル名はカンマ (,) で区切ります。

STRING

いいえ

なし

型のマッピング

PolarDB-X 型

Flink 型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP_LTZ [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB

データインジェスト

Realtime Compute for Apache Flink 11.6 は、データインジェスト YAML ジョブのデータソースとして PolarDB-X コネクタをサポートするようになりました。

構文

source:
   type: polardbx
   name: PolarDB-X Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: pdb.order_table
   # Binlog 内のアーカイブイベントを無視します
   polardbx.binlog.ignore.archive-events.enabled: true
   # Binlog 内のクエリエベントを無視します
   polardbx.binlog.ignore.query-events.enabled: true
   # 帯域幅を節約するために、pdb.order_table の Binlog のみをサブスクライブします。
   polardbx.binlog.include.tables: pdb.order_table

sink:
  type: values

パラメーター

パラメーター

説明

必須

デフォルト

備考

type

データソースのタイプ。

はい

STRING

N/A

値は polardbx である必要があります。

name

データソースの名前。

いいえ

STRING

N/A

N/A

hostname

PolarDB-X インスタンスの IP アドレスまたはホスト名。

はい

STRING

N/A

VPC アドレスを推奨します。

説明

ご利用の PolarDB-X インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC にない場合、VPC 間接続を確立するか、パブリックネットワーク経由でインスタンスにアクセスする必要があります。詳細については、「ワークスペースの管理と運用」および「フルマネージド Flink クラスターはどのようにパブリックネットワークにアクセスできますか?」をご参照ください。

username

PolarDB-X データベースに接続するためのユーザー名。

はい

STRING

N/A

N/A

password

指定されたユーザー名のパスワード。

はい

STRING

N/A

N/A

tables

同期する PolarDB-X テーブル。

はい

STRING

N/A

  • 正規表現をサポートし、複数のテーブルからデータを読み取ることができます。

  • 複数の正規表現はカンマ (,) で区切ることができます。

説明
  • 正規表現では、開始アンカー ^ と終了アンカー $ を使用しないでください。

  • ピリオド (.) はデータベース名とテーブル名を区切ります。リテラルなピリオドに一致させるには、バックスラッシュでエスケープする必要があります。例:db0.\.*db1.user_table_[0-9]+、または db[1-2].[app|web]order_\.*

tables.exclude

同期から除外するテーブル。

いいえ

STRING

N/A

  • 正規表現をサポートし、複数のテーブルを除外できます。

  • 複数の正規表現はカンマ (,) で区切ることができます。

説明

ピリオド (.) はデータベース名とテーブル名を区切ります。リテラルなピリオドに一致させるには、バックスラッシュでエスケープする必要があります。例:db0.\.*db1.user_table_[0-9]+、または db[1-2].[app|web]order_\.*

port

PolarDB-X インスタンスのポート。

いいえ

INTEGER

3306

N/A

schema-change.enabled

スキーマ変更イベントを発行するかどうかを指定します。

いいえ

BOOLEAN

true

N/A

jdbc.properties.*

JDBC URL のカスタム接続パラメーター。

いいえ

STRING

N/A

カスタム接続パラメーターを渡すことができます。例えば、SSL を無効にするには、'jdbc.properties.useSSL' = 'false' を設定します。

debezium.*

Binlog を読み取るためのカスタム Debezium パラメーター。

いいえ

STRING

N/A

カスタム Debezium パラメーターを渡すことができます。例えば、コネクタが逆シリアル化エラーをどのように処理するかを定義するために、'debezium.event.deserialization.failure.handling.mode' = 'ignore' を設定します。

scan.incremental.snapshot.chunk.size

各チャンクのサイズ (行数)。

いいえ

INTEGER

8096

PolarDB-X テーブルは読み取りのために複数のチャンクに分割されます。各チャンクのデータは、完全に読み取られるまでメモリにキャッシュされます。

チャンクサイズを小さくすると、チャンクの総数が増加し、障害復旧の粒度が細かくなりますが、OOM のリスクが高まり、全体のスループットが低下する可能性があります。これらのトレードオフのバランスを取り、適切なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

完全なテーブルスナップショット中に一度にフェッチされる最大レコード数。

いいえ

INTEGER

1024

N/A

scan.startup.mode

データを消費するための起動モード。

いいえ

STRING

initial

有効な値:

  • initial (デフォルト):テーブルの初期スナップショットを取得し、その後 Binlog から最新の変更を読み取ります。

  • latest-offset:スナップショットフェーズをスキップし、最新の位置から Binlog を読み取り、コネクタの起動後に行われた変更のみをキャプチャします。

  • earliest-offset:スナップショットフェーズをスキップし、利用可能な最も古い Binlog の位置から読み取りを開始します。

  • specific-offset:スナップショットフェーズをスキップし、特定の位置から読み取りを開始します。位置は scan.startup.specific-offset.filescan.startup.specific-offset.pos を使用するか、scan.startup.specific-offset.gtid-set を使用して指定します。

  • timestamp:スナップショットフェーズをスキップし、scan.startup.timestamp-millis パラメーターでミリ秒単位で指定された特定のタイムスタンプから Binlog の読み取りを開始します。

重要

earliest-offsetspecific-offset、および timestamp の起動モードでは、起動時のテーブルスキーマが指定された起動位置のテーブルスキーマと異なる場合、ジョブはエラーで失敗します。つまり、これらの 3 つの起動モードを使用する場合、指定された Binlog 消費位置とジョブの起動時間の間に、対応するテーブルのスキーマが変更されないようにする必要があります。

scan.startup.specific-offset.file

起動位置の Binlog ファイル名。

いいえ

STRING

N/A

scan.startup.modespecific-offset に設定されている場合に必須です。フォーマット例:mysql-bin.000003

scan.startup.specific-offset.pos

指定された Binlog ファイル内で読み取りを開始するオフセット。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に必須です。

scan.startup.specific-offset.gtid-set

起動位置の GTID セット。

いいえ

STRING

N/A

scan.startup.modespecific-offset に設定されている場合に使用されます。GTID セットのフォーマット例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

起動位置のタイムスタンプ (ミリ秒)。

いいえ

LONG

N/A

scan.startup.modetimestamp に設定されている場合に必須です。

重要

タイムスタンプを指定すると、コネクタは各 Binlog ファイルの最初のイベントを読み取ってタイムスタンプを確認し、正しい開始ファイルを見つけます。指定したタイムスタンプの対象となる Binlog ファイルがデータベースからパージされておらず、読み取り可能であることを確認してください。

server-time-zone

データベースが使用するセッションタイムゾーン。

いいえ

STRING

Flink ジョブの実行環境のタイムゾーン (ご利用のワークスペースのアベイラビリティゾーン)。

例:Asia/Shanghai。このパラメーターは、PolarDB-X の TIMESTAMP 型が STRING にどのように変換されるかを制御します。詳細については、「Debezium の時間型に関するドキュメント」をご参照ください。

scan.startup.specific-offset.skip-events

特定の位置から読み取る際にスキップする Binlog イベントの数。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に使用されます。

scan.startup.specific-offset.skip-rows

特定の位置から読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれることがあります。

いいえ

INTEGER

N/A

scan.startup.modespecific-offset に設定されている場合に使用されます。

connect.timeout

データベース接続をリトライするまでの最大待機時間。

いいえ

DURATION

30s

N/A

connect.max-retries

失敗したデータベース接続の最大リトライ回数。

いいえ

INTEGER

3

N/A

connection.pool.size

データベース接続プールのサイズ。

いいえ

INTEGER

20

接続プールは接続を再利用して、接続のオーバーヘッドを削減します。

heartbeat.interval

ソースが Binlog の位置を進めるためにハートビートイベントを送信する間隔。

いいえ

DURATION

30s

ハートビートイベントは、更新頻度の低いテーブルの Binlog の位置を進めます。これがないと、Binlog の位置が進まず、Binlog の期限切れにつながる可能性があります。期限切れの Binlog はジョブの失敗を引き起こし、回復にはステートレス再起動が必要になります。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にチャンクの分割キーとして使用する列を指定します。

いいえ

STRING

N/A

プライマリキーから 1 つの列のみを選択できます。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

INTEGER

1000

メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。

chunk-key.even-distribution.factor.lower-bound

均等なチャンク分割を使用するかどうかを決定するためのチャンクディストリビューションファクターの下限。

いいえ

DOUBLE

0.05

ディストリビューションファクターがこの値より小さい場合、不均等なチャンク分割が使用されます。

チャンクディストリビューションファクター = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。

chunk-key.even-distribution.factor.upper-bound

均等なチャンク分割を使用するかどうかを決定するためのチャンクディストリビューションファクターの上限。

いいえ

DOUBLE

1000.0

ディストリビューションファクターがこの値より大きい場合、不均等なチャンク分割が使用されます。

チャンクディストリビューションファクター = (MAX(chunk-key) - MIN(chunk-key) + 1) / データ行の総数。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue に設定する必要があります。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

いいえ

BOOLEAN

true

有効な値:

  • true:対象テーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

  • false:すべてのテーブルの変更データを逆シリアル化します。

scan.parallel-deserialize-changelog.enabled

増分フェーズ中に複数のスレッドを使用して変更イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:Binlog イベントの順序を維持しながら、複数のスレッドを使用して変更イベントの逆シリアル化を行い、読み取りを高速化します。

  • false (デフォルト):単一のスレッドを使用してイベントの逆シリアル化を行います。

scan.parallel-deserialize-changelog.handler.size

変更イベントを並行して解析する際に使用するイベントハンドラの数。

いいえ

INTEGER

2

N/A

metadata-column.include-list

ダウンストリームの sink に渡すメタデータ列。

いいえ

STRING

N/A

利用可能なメタデータには op_tses_tsquery_logfile、および pos が含まれます。複数のメタデータ列はカンマで区切ることができます。

説明

PolarDB-X CDC YAML コネクタは、データベース名、テーブル名、または op_type のメタデータ列の追加を要求またはサポートしません。変換式で __data_event_type__ を使用して変更データの型を取得したり、__schema_name____table_name__ を使用してデータベース名とテーブル名を取得したりできます。

重要

file 列には Binlog ファイル名 (増分フェーズ) または空の文字列 (スナップショットフェーズ) が含まれます。pos 列には Binlog オフセット (増分フェーズ) または 0 (スナップショットフェーズ) が含まれます。

scan.newly-added-table.enabled

チェックポイントまたはセーブポイントから再起動する際に、tables パターンに一致する新しく追加されたテーブルを同期し、一致しなくなったテーブルの追跡を停止するかどうかを指定します。

いいえ

BOOLEAN

false

この設定は、チェックポイントまたはセーブポイントから再起動する場合にのみ有効です。

scan.binlog.newly-added-table.enabled

増分フェーズ中に、新しく一致したテーブルのデータを送信するかどうかを指定します。

いいえ

BOOLEAN

false

scan.newly-added-table.enabled と同時に有効にすることはできません。

scan.incremental.snapshot.chunk.key-column

特定のテーブルのスナップショットフェーズ中にチャンク分割のための分割キー列を指定します。

いいえ

STRING

N/A

  • コロン (:) を使用してテーブル名と列名を接続し、ルールを定義します。テーブル名は正規表現にすることができます。複数のルールはセミコロン (;) で区切って定義できます。例:db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2

  • このパラメーターはプライマリキーのないテーブルでは必須であり、選択された列は NULL 不可 (NOT NULL) 型でなければなりません。プライマリキーのあるテーブルでは、このパラメーターはオプションであり、プライマリキーから 1 つの列のみを選択できます。

scan.parse.online.schema.changes.enabled

増分フェーズ中に、ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析します。

  • false (デフォルト):ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析しません。

これは実験的な機能です。オンラインでロックフリーの変更を実行する前に、Flink ジョブのセーブポイントを作成して、必要に応じて復元できるようにすることを推奨します。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

バックフィルがスキップされた場合、スナップショットフェーズ中にテーブルに加えられた変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショットフェーズからの変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。このモードは at-least-once セマンティクスを提供します。

treat-tinyint1-as-boolean.enabled

TINYINT(1) データ型を BOOLEAN として扱うかどうかを指定します。

いいえ

BOOLEAN

true

有効な値:

  • true (デフォルト):TINYINT(1) 型を BOOLEAN として扱います。

  • falseTINYINT(1) 型を BOOLEAN として扱いません。

treat-timestamp-as-datetime-enabled

TIMESTAMP データ型を DATETIME として扱うかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:PolarDB-X の TIMESTAMP 型を DATETIME として扱い、CDC の TIMESTAMP 型にマッピングします。

  • false (デフォルト):PolarDB-X の TIMESTAMP 型を CDC の TIMESTAMP_LTZ 型にマッピングします。

PolarDB-X の TIMESTAMP 型は協定世界時 (UTC) を格納し、タイムゾーンの影響を受けます。PolarDB-X の DATETIME 型はリテラルな時間を格納し、タイムゾーンの影響を受けません。

このパラメーターを有効にすると、PolarDB-X の TIMESTAMP データは server-time-zone に基づいて DATETIME 型に変換されます。

include-comments.enabled

テーブルと列のコメントを同期するかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:テーブルと列のコメントを同期します。

  • false (デフォルト):テーブルと列のコメントを同期しません。

このパラメーターを有効にすると、ジョブのメモリ使用量が増加します。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、有界でないチャンクを最初にディスパッチするかどうかを指定します。

いいえ

BOOLEAN

false

有効な値:

  • true:スナップショット読み取りフェーズ中に、有界でないチャンクを最初にディスパッチします。

  • false (デフォルト):スナップショット読み取りフェーズ中に、有界でないチャンクを最初にディスパッチしません。

これは実験的な機能です。有効にすると、最後のチャンクが同期される際の TaskManager での OOM エラーのリスクを軽減できます。ジョブの最初の起動前にこのパラメーターを追加することを推奨します。

binlog.session.network.timeout

Binlog 接続のネットワークタイムアウト。

いいえ

DURATION

10m

0s に設定すると、サーバー側のデフォルトのタイムアウトが使用されます。

scan.rate-limit.records-per-second

ソースが 1 秒あたりに発行できる最大レコード数。

いいえ

LONG

N/A

このパラメーターを使用して、データ読み取りのスループットを制限します。この制限は、スナップショットフェーズと増分フェーズの両方に適用されます。

ソースの numRecordsOutPerSecond メトリックは、データストリーム全体で 1 秒あたりに発行されたレコード数を示します。このメトリックを使用して、このパラメーターを調整できます。

スナップショット読み取りフェーズ中に、scan.incremental.snapshot.chunk.size パラメーターの値を小さくして、バッチごとに読み取る行数を減らす必要がある場合もあります。

include-binlog-meta.enable

GTID や Binlog の位置など、生の PolarDB-X Binlog メタデータを発行されるレコードに含めるかどうかを指定します。

いいえ

BOOLEAN

false

既存の Canal ベースの同期パイプラインを置き換えるなど、生の Binlog 同期シナリオに適用できます。

polardbx.binlog.ignore.archive-events.enabled

PolarDB-X Binlog 内のアーカイブイベント (主に DELETE イベント) を無視するかどうかを指定します。

いいえ

BOOLEAN

false

polardbx.binlog.ignore.query-events.enabled

PolarDB-X Binlog 内の ROWS_QUERY_LOG_EVENT イベントを無視するかどうかを指定します。

いいえ

BOOLEAN

false

polardbx.binlog.include.tables

Binlog イベントをサブスクライブするテーブルのカンマ区切りリスト。

いいえ

STRING

N/A

説明

このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。

polardbx.binlog.exclude.tables

Binlog イベントを無視するテーブルのカンマ区切りリスト。

いいえ

STRING

N/A

説明

このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。