すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PolarDB-X CDC (パブリックプレビュー)

最終更新日:Jan 10, 2026

このトピックでは、PolarDB-X コネクタの使用方法について説明します。

背景情報

PolarDB for Xscale (PolarDB-X) は、Alibaba Cloud が提供するパフォーマンス専有型のクラウドネイティブ分散データベースサービスです。高スループット、大容量ストレージ、低レイテンシー、容易なスケーラビリティ、高可用性を提供します。

重要

このコネクタは、Ververica Runtime (VVR) 11.5 以降をサポートしており、PolarDB-X 2.0 以降と併用する必要があります。

PolarDB-X CDC コネクタは、ソーステーブルとしてのみ使用できます。PolarDB-X インスタンスのディメンションテーブルをクエリしたり、結果テーブルに書き込んだりするには、MySQL コネクタ (パブリックプレビュー) を使用してください。

カテゴリ

詳細

サポートタイプ

ソーステーブル

ランタイムモード

ストリーミングモードのみ

データフォーマット

該当なし

特定のモニタリングメトリック

  • currentFetchEventTimeLag:データが生成されてから Source Operator によってプルされるまでの間隔。

    このメトリックはバイナリログフェーズでのみ有効です。スナップショットフェーズでは、この値は常に 0 です。

  • currentEmitEventTimeLag:データが生成されてから Source Operator を離れるまでの間隔。

    このメトリックはバイナリログフェーズでのみ有効です。スナップショットフェーズでは、この値は常に 0 です。

  • sourceIdleTime:ソーステーブルがアイドル状態であった期間。

API タイプ

SQL

結果テーブルへの更新または削除のサポート

いいえ

機能

PolarDB-X CDC コネクタは、サーバーサイドでのフィルタリングと不要なバイナリログの削減をサポートすることで、バイナリログ解析フェーズのパフォーマンスを最適化します。これにより、スループットが向上し、ネットワーク帯域幅が節約されます。

オンデマンドのバイナリログサブスクリプションの例

このバージョンでは、バイナリログのサーバーサイドフィルタリングをサポートしており、必要な変更ログのみをクライアントに送信します。これにより、ネットワークトラフィックが削減され、ログ消費のスループットが向上します。

例えば、PolarDB-X サーバー上の db.table1db.table2 テーブルの変更データのみをサブスクライブするには、Flink SQL ジョブを次のように構成します。

CREATE TABLE polardbx_table_foo (
  ... -- ここでテーブルスキーマを定義
) WITH (
  'connector' = 'polardbx-cdc',
  'database-name' = 'db',
  'table-name' = '.*',
  ..., -- その他のパラメーター
  'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 対応するテーブルのデータのみをサブスクライブ
);

インスタンス全体のすべての変更ログをロードしてクライアントサイドでフィルタリングする MySQL CDC コネクタとは異なり、PolarDB-X CDC コネクタはサーバー上でバイナリログをフィルタリングできます。これにより、クライアントは必要に応じてバイナリログをサブスクライブでき、ネットワーク I/O オーバーヘッドを大幅に削減できます。

制限事項

サーバーサイドでのバイナリログフィルタリングと特定テーブルへのサブスクリプションには、PolarDB-X サーバーバージョン 2.5.0 以降、および Simple Log Service コンポーネントバージョン 5.4.20 以降が必要です。

SQL

構文

CREATE TABLE polardbx_customer_table(
  `id` STRING,
  [columnName dataType,]*
  PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
  'connector' = 'polardbx-cdc',
  'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
  'username' = 'pdx_user',
  'password' = 'pdx_password',
  'database' = 'full_db',
  'collection' = 'customers'
)

WITH パラメーター

パラメーター

説明

データ型

必須

デフォルト値

注意

connector

コネクタの名前。

STRING

はい

なし

値は polardbx-cdc である必要があります。

hostname

PolarDB-X データベースの IP アドレスまたはホスト名。

STRING

はい

なし

インスタンスのクラスターエンドポイントを指定します。

port

PolarDB-X データベースのサービスポート番号。

INTEGER

いいえ

3306

なし。

username

PolarDB-X データベースサービス用のユーザー名。

STRING

はい

なし

なし。

password

PolarDB-X データベースサービス用のパスワード。

STRING

はい

なし

なし。

database-name

PolarDB-X データベースの名前。

STRING

はい

なし

正規表現を使用して、複数のデータベースからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の先頭と末尾を照合するために ^ および $ 記号を使用しないでください。

table-name

PolarDB-X テーブルの名前。

STRING

はい

なし

正規表現を使用して、複数のテーブルからデータを読み取ることができます。

説明

正規表現を使用する場合、文字列の先頭と末尾を照合するために ^ および $ 記号を使用しないでください。

server-time-zone

データベースが使用するセッションタイムゾーン。

STRING

いいえ

ジョブが実行されるゾーンのタイムゾーン。

Asia/Shanghai などの IANA タイムゾーン識別子を指定します。このパラメーターは、ソーステーブルの TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。

scan.incremental.snapshot.chunk.size

増分スナップショットからデータを読み取る際の各チャンクのサイズ (行数)。

INTEGER

いいえ

8096

PolarDB-X はテーブルを複数のチャンクに分割して読み取り、チャンクデータをメモリにキャッシュします。チャンクあたりの行数を減らすと、チャンクの総数が増加します。これにより、よりきめ細かい障害回復が可能になりますが、Out-of-Memory (OOM) エラーのリスクが高まり、スループットが低下します。パフォーマンスのバランスを取るために、適切なチャンクサイズを構成してください。

scan.snapshot.fetch.size

テーブルから完全データを読み取る際に一度にプルするレコードの最大数。

INTEGER

いいえ

1024

なし。

connect.timeout

PolarDB-X データベースサーバーへの接続がタイムアウトした後、接続をリトライするまでの最大待機時間。

DURATION

いいえ

30s

なし。

connection.pool.size

データベース接続プールのサイズ。

INTEGER

いいえ

20

データベース接続プールは接続を再利用して、データベース接続の数を減らします。

connect.max-retries

MySQL データベースサービスへの接続に失敗した後の最大リトライ回数。

INTEGER

いいえ

3

なし。

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

initial

有効な値:

  • initial (デフォルト):ジョブが初めて起動するとき、すべての既存データをスキャンし、その後最新のバイナリログデータを読み取ります。

  • latest-offset:ジョブが初めて起動するとき、既存データをスキャンしません。バイナリログの末尾から読み取りを開始します。つまり、コネクタの起動後に発生した変更のみを読み取ります。

  • earliest-offset:既存データをスキャンしません。利用可能な最も古いバイナリログから読み取りを開始します。

  • specific-offset:既存データをスキャンしません。指定されたバイナリログオフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos の両方を構成して特定のバイナリログファイルと位置から開始するか、scan.startup.specific-offset.gtid-set のみを構成して特定の GTID セットから開始することができます。

  • timestamp:既存データをスキャンしません。指定されたタイムスタンプからバイナリログの読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis によってミリ秒単位で指定されます。

重要

earliest-offsetspecific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定されたオフセットのスキーマと一致する必要があります。スキーマの不一致はジョブの失敗を引き起こします。指定されたバイナリログオフセットとジョブの起動の間にテーブルスキーマが変更されないことを確認してください。

scan.startup.specific-offset.file

特定オフセットモードを使用する場合の開始オフセットのバイナリログファイル名。

STRING

いいえ

なし

このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の形式例:mysql-bin.000003

scan.startup.specific-offset.pos

特定オフセットモードを使用する場合の指定されたバイナリログファイル内の開始オフセットの位置。

INTEGER

いいえ

なし

このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.gtid-set

特定オフセットモードを使用する場合の開始オフセットの GTID セット。

STRING

いいえ

なし

このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの形式例:24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

特定時間モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。

LONG

いいえ

なし

このパラメーターを使用する場合、scan.startup.modetimestamp に設定する必要があります。タイムスタンプはミリ秒単位です。

scan.startup.specific-offset.skip-events

指定されたオフセットから読み取る際にスキップするバイナリログイベントの数。

INTEGER

いいえ

なし

このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。

scan.startup.specific-offset.skip-rows

指定されたオフセットから読み取る際にスキップする行の変更数。単一のバイナリログイベントは、複数の行の変更に対応する場合があります。

INTEGER

いいえ

なし

このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。

heartbeat.interval

ソースがハートビートイベントを使用してバイナリログオフセットを進める間隔。

DURATION

いいえ

なし

ハートビートイベントは、ソース側でバイナリログオフセットを強制的に進めます。このメカニズムは、更新頻度が低いことによるバイナリログの期限切れを防ぎます。期限切れのバイナリログはジョブの失敗を引き起こし、ステートレス再起動によってのみ回復できます。

chunk-meta.group.size

チャンクメタデータのサイズ。

INTEGER

いいえ

1000

メタデータがこの値より大きい場合、複数の部分に分割されて転送されます。

chunk-key.even-distribution.factor.upper-bound

均等シャーディングのためのチャンク分散係数の上限。

DOUBLE

いいえ

1000.0

分散係数がこの値より大きい場合、不均等シャーディングが使用されます。

チャンク分散係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。

chunk-key.even-distribution.factor.lower-bound

均等シャーディングのためのチャンク分散係数の下限。

DOUBLE

いいえ

0.05

分散係数がこの値より小さい場合、不均等シャーディングが使用されます。

チャンク分散係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。

scan.newly-added-table.enabled

ジョブがチェックポイントから再起動する際に、新しく追加されたキャプチャ対象テーブルをスキャンするかどうかを指定します。

BOOLEAN

いいえ

false

有効にすると、システムは以前は一致しなかった新しく追加されたテーブルを同期し、一致しなくなったテーブルを状態から削除します。これは、チェックポイントまたはセーブポイントから再起動するときに有効になります。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズでデータシャーディングに使用される列を指定します。

STRING

「注意」をご参照ください

なし

  • プライマリキーのないテーブルでは必須です。選択する列は NOT NULL 型である必要があります。

  • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズが終了した後、アイドル状態のリーダーをシャットダウンするかどうかを指定します。

BOOLEAN

いいえ

false

この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled も true に設定する必要があります。

scan.incremental.snapshot.backfill.skip

スナップショット読み取りフェーズでバックフィルをスキップするかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズでバックフィルをスキップします。

  • false (デフォルト):スナップショット読み取りフェーズでバックフィルをスキップしません。

バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。

重要

バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。

scan.parse.online.schema.changes.enabled

増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:RDS のロックレス変更 DDL イベントを解析します。

  • false (デフォルト):RDS のロックレス変更 DDL イベントを解析しません。

これは実験的な機能です。オンラインのロックレス変更を実行する前に、回復を容易にするために Flink ジョブのセーブポイントを作成してください。

scan.only.deserialize.captured.tables.changelog.enabled

増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

BOOLEAN

いいえ

true

有効な値:

  • true:ターゲットテーブルの変更データのみを逆シリアル化して、バイナリログの読み取りを高速化します。

  • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

scan.read-changelog-as-append-only.enabled

変更ログストリームを追加専用ストリームに変換するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプのメッセージが INSERT メッセージに変換されます。このオプションは、先祖テーブルからの削除メッセージを保存する必要がある場合など、特定のシナリオでのみ有効にしてください。

  • false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。

scan.parallel-deserialize-changelog.enabled

増分フェーズで、複数スレッドを使用して変更イベントを解析するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:バイナリログイベントの順序を維持しながら、変更イベントの逆シリアル化フェーズでマルチスレッド処理を使用して読み取りを高速化します。

  • false (デフォルト):イベントの逆シリアル化フェーズでシングルスレッド処理を使用します。

scan.parallel-deserialize-changelog.handler.size

複数スレッドを使用して変更イベントを解析する場合のイベントハンドラの数。

INTEGER

いいえ

2

なし。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズで無制限チャンクを最初に分散するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:スナップショット読み取りフェーズで無制限チャンクを最初に分散します。

  • false (デフォルト):スナップショット読み取りフェーズで無制限チャンクを最初に分散しません。

これは実験的な機能です。有効にすると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブが初めて起動する前にこの構成を追加してください。

polardbx.binlog.ignore.archive-events.enabled

PolarDB-X バイナリログのアーカイブイベント (主に `DELETE` イベント) を無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.ignore.query-events.enabled

PolarDB-X バイナリログのクエリイベントを無視するかどうかを指定します。

BOOLEAN

いいえ

false

polardbx.binlog.include.tables

これらのテーブルのバイナリログのみをサブスクライブします。複数のテーブル名はカンマ (,) で区切ります。

STRING

いいえ

なし

polardbx.binlog.exclude.tables

これらのテーブルのバイナリログをサブスクライブしません。複数のテーブル名はカンマ (,) で区切ります。

STRING

いいえ

なし

型のマッピング

PolarDB-X データ型

Flink データ型

TINYINT

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

INT

INT

MEDIUMINT

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

BOOLEAN

BOOLEAN

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIME ZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIME ZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)

STRING

VARCHAR(n)

TEXT

BINARY

BYTES

VARBINARY

BLOB