すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PostgreSQL CDC (パブリックプレビュー)

最終更新日:Dec 04, 2025

PostgreSQL 変更データキャプチャ (CDC) コネクタは、PostgreSQL データベースから完全なスナップショットと変更データを読み取ります。このコネクタは、各データレコードが 1 回だけ読み取られることを保証し、障害回復中に 1 回限りのセマンティクスを維持します。このトピックでは、PostgreSQL CDC コネクタの使用方法について説明します。

背景情報

Postgres CDC コネクタには、次の機能があります。

項目

説明

サポートされるタイプ

ソーステーブル

説明

JDBC コネクタを使用して、結果テーブルまたはディメンションテーブルを作成できます。

実行モード

ストリーミングモードのみ

データフォーマット

該当なし

特定の監視メトリック

  • currentFetchEventTimeLag:データが生成されてから Source オペレーターにプルされるまでの間隔。

  • currentEmitEventTimeLag:データが生成されてから Source オペレーターを出るまでの間隔。

  • sourceIdleTime:ソースが新しいデータを生成していない期間。

説明
  • currentFetchEventTimeLag と currentEmitEventTimeLag メトリックは、増分フェーズでのみ有効です。完全スナップショットフェーズでは、値は常に 0 です。

  • メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

SQL およびデータインジェスト YAML

結果テーブルのデータの更新または削除

該当なし

特徴

PostgreSQL CDC コネクタは、増分スナップショットフレームワークを使用します。これは、Ververica Runtime (VVR) 8.0.6 以降を使用する Realtime Compute for Apache Flink で利用できます。コネクタはまず完全な既存データを読み取り、その後自動的に先行書き込みログ (WAL) の変更ログの読み取りに切り替わります。このプロセスにより、データの欠落や重複がないことが保証されます。障害が発生した場合でも、データは 1 回限りのセマンティクスで処理されます。PostgreSQL CDC ソーステーブルは、完全データの同時読み取り、ロックフリーの読み取り、および再開可能なデータ転送をサポートします。

ソーステーブルとして、次の特徴と利点があります:

  • ストリーム処理とバッチ処理を統合します。完全データと増分データの読み取りをサポートするため、2 つの別々のプロセスを維持する必要がなくなります。

  • 完全データの同時読み取りをサポートし、水平方向のパフォーマンススケーリングを実現します。

  • 完全データの読み取りから増分データの読み取りにシームレスに切り替え、自動的にスケールインして計算リソースを節約します。

  • 完全データ読み取りフェーズ中の再開可能なデータ転送をサポートし、安定性を向上させます。

  • オンラインビジネス運用に影響を与えないように、ロックなしで完全データを読み取ります。

前提条件

PostgreSQL CDC コネクタは、PostgreSQL データベースの論理レプリケーション機能を使用して CDC データストリームを読み取ります。このコネクタは Alibaba Cloud RDS for PostgreSQLAmazon RDS for PostgreSQL、および自己管理 PostgreSQL をサポートします。

重要

Alibaba Cloud RDS for PostgreSQLAmazon RDS for PostgreSQL、および自己管理 PostgreSQL の構成は異なります。開始する前に、「PostgreSQL の構成」ドキュメントで説明されている必要な構成を完了してください

構成を完了した後、次の条件が満たされていることを確認してください:

  • wal_level パラメーターの値が logical に設定されていること。これにより、論理エンコーディングに必要な情報が先行書き込みログ (WAL) に追加されます。

  • サブスクライブするテーブルの REPLICA IDENTITY が FULL に設定されていること。これにより、INSERT および UPDATE イベントにテーブルのすべての列の以前の値が含まれるようになり、データ同期の整合性が保証されます。

    説明

    REPLICA IDENTITY は、PostgreSQL 固有のテーブルレベルの設定です。これにより、論理デコーディングプラグインが INSERT および UPDATE イベント中に、関連するテーブル列の以前の値を含めるかどうかが決まります。REPLICA IDENTITY の値の詳細については、「REPLICA IDENTITY」をご参照ください。

  • max_wal_senders および max_replication_slots パラメーターの値が、データベースで現在使用中のレプリケーションスロットの合計と Flink ジョブで必要なスロット数の合計よりも大きいこと。

  • アカウントに SUPERUSER システム権限、または LOGIN と REPLICATION の両方の権限があること。また、完全データをクエリするために、サブスクライブするテーブルに対する SELECT 権限も必要です。

注意事項

  • PostgreSQL CDC の増分スナップショット機能は、Realtime Compute for Apache Flink V8.0.6 以降でのみサポートされます。

Flink PostgreSQL CDC ジョブは、レプリケーションスロットに依存して、先行書き込みログ (WAL) が早期にパージされないようにし、データ整合性を保証します。ただし、不適切な管理はディスク領域の浪費データ読み取りレイテンシなどの問題を引き起こす可能性があります。以下の推奨事項に従ってください:

  • 使用されなくなったスロットを速やかにパージする

    • Flink は、特にステートレス再起動シナリオでは、ジョブが停止した後でもレプリケーションスロットを自動的に削除しません。この動作は、WAL がパージされた場合に発生する可能性のあるデータ損失を防ぎます。

    • ジョブが再起動されないことを確認した場合、関連するレプリケーションスロットを手動で削除してディスク領域を解放する必要があります。

      重要

      ライフサイクル管理:レプリケーションスロットをジョブのリソースの一部として扱います。ジョブの開始および停止操作と同期して管理します。

  • 古いスロットの再利用を避ける

    • 新しいジョブは、古いスロット名を再利用するのではなく、新しいスロット名を使用する必要があります。スロットを再利用すると、ジョブが起動時に大量の履歴 WAL データを読み取ることになり、最新データの読み取りが遅延する原因となります。

    • PostgreSQL の論理レプリケーションでは、1 つのスロットは 1 つの接続でのみ使用できます。したがって、異なるジョブは異なるスロット名を使用する必要があります。

      重要

      命名規則:`slot.name` をカスタマイズする場合、一時スロットとの競合を避けるために、`my_slot_1` のような数値サフィックスを持つ名前の使用は避けてください。

  • 増分スナップショットが有効な場合のスロットの動作

    • 前提条件:チェックポイントを有効にし、ソーステーブルにプライマリキーが必要です。

    • スロット作成ルール:

      • 増分スナップショットが無効な場合:単一の同時実行のみがサポートされ、1 つのグローバルスロットを使用します。

      • 増分スナップショットが有効な場合

        • 完全フェーズ各同時ソースサブタスクは、${slot.name}_${task_id} という形式の名前で一時スロットを作成します。

        • 増分フェーズ:すべての一時スロットは自動的に回収されます。1 つのグローバルスロットのみが保持されます。

    • 最大スロット数:ソースの同時実行数 + 1 (完全スナップショットフェーズ中)

  • リソースとパフォーマンス

    • PostgreSQL のスロット数またはディスク領域が限られている場合は、完全スナップショットフェーズの同時実行数を減らして、一時スロットの数を減らすことができます。これにより、完全データの読み取り速度が低下します。

    • ダウンストリームシステムがべき等書き込みをサポートしている場合は、scan.incremental.snapshot.backfill.skip = true を設定して、完全フェーズ中の WAL バックフィルをスキップし、起動速度を向上させることができます。

      この構成は at-least-once セマンティクスのみを提供します。集約やディメンションテーブル結合などのステートフル計算を持つジョブには適していません。中間状態に必要な履歴の変更が失われる可能性があるためです。

  • 増分スナップショット機能が有効でない場合、PostgreSQL CDC コネクタは全表スキャンフェーズ中のチェックポイントの実行をサポートしません。

    増分スナップショットが有効でない場合、全表スキャンフェーズ中にチェックポイントがトリガーされると、チェックポイントタイムアウトによりジョブがフェールオーバーする可能性があります。したがって、[その他の構成] セクションで次のパラメーターを構成して、完全同期フェーズ中のチェックポイントタイムアウトによるフェールオーバーを防ぐことができます。詳細については、「ジョブのカスタム実行時パラメーターを構成する方法」をご参照ください。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    次の表にパラメーターを示します。

    パラメーター

    説明

    注意事項

    execution.checkpointing.interval

    チェックポイントがトリガーされる間隔。

    データ型:Duration。例:10 min または 30 s。

    execution.checkpointing.tolerable-failed-checkpoints

    許容されるチェックポイントの失敗回数。

    このパラメーターの値とチェックポイントのスケジューリング間隔の積によって、許容されるスナップショットの読み取り時間が決まります。

    説明

    テーブルが非常に大きい場合は、このパラメーターをより大きな値に設定してください。

    restart-strategy

    再起動ポリシー。

    有効値:

    • fixed-delay:固定遅延再起動ポリシー。

    • failure-rate:失敗率再起動ポリシー。

    • exponential-delay:指数遅延再起動ポリシー。

    詳細については、「再起動戦略」をご参照ください。

    restart-strategy.fixed-delay.attempts

    固定遅延再起動ポリシーの最大再試行回数。

    なし。

SQL

構文

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH 句のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

注意事項

connector

コネクタのタイプ。

STRING

はい

なし

値は postgres-cdc である必要があります。

hostname

PostgreSQL データベースの IP アドレスまたはホスト名。

STRING

はい

なし

なし。

username

PostgreSQL データベースサービスのユーザー名。

STRING

はい

なし

なし。

password

PostgreSQL データベースサービスのパスワード。

STRING

はい

なし

なし。

database-name

データベース名。

STRING

はい

なし

データベース名。

schema-name

PostgreSQL のスキーマ名。

STRING

はい

なし

スキーマ名は正規表現をサポートしており、複数のスキーマからデータを読み取ることができます。

table-name

PostgreSQL のテーブル名。

STRING

はい

なし

テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。

port

PostgreSQL データベースサービスのポート番号。

INTEGER

いいえ

5432

なし。

decoding.plugin.name

PostgreSQL の論理デコーディングプラグインの名前。

STRING

いいえ

decoderbufs

これは、PostgreSQL サービスにインストールされているプラグインによって決まります。サポートされているプラグインは次のとおりです:

  • decoderbufs (デフォルト):PostgreSQL 9.6 以降でサポートされています。このプラグインをインストールする必要があります。

  • pgoutput (推奨):PostgreSQL 10 以降の公式組み込みプラグイン。

slot.name

論理デコーディングスロットの名前。

STRING

8.0.1 より前のバージョンではオプション。8.0.1 以降のバージョンでは必須。

8.0.1 より前のバージョンのデフォルト値は flink です。8.0.1 以降のバージョンにはデフォルト値はありません。

PSQLException: ERROR: replication slot "debezium" is active for PID 974 エラーを回避するために、各テーブルに slot.name パラメーターを設定します。

debezium.*

Debezium プロパティパラメーター。

STRING

いいえ

なし

Debezium クライアントの動作をより詳細に制御できます。例:'debezium.snapshot.mode' = 'never'。詳細については、「コネクタのプロパティ」をご参照ください。

scan.incremental.snapshot.enabled

増分スナップショットを有効にするかどうかを指定します。

BOOLEAN

いいえ

false

有効値:

  • false (デフォルト):増分スナップショットを無効にします。

  • true:増分スナップショットを有効にします。

説明
  • これは実験的な機能です。このパラメーターは、Realtime Compute for Apache Flink V8.0.6 以降でのみサポートされます。

  • 増分スナップショットの機能上の利点、前提条件、および制限事項の詳細については、「特徴」、「前提条件」、および「制限事項」をご参照ください。

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

initial

有効値:

  • initial (デフォルト):最初の起動時に完全な既存データをスキャンし、その後最新の WAL データを読み取ります。

  • latest-offset:最初の起動時に完全な既存データをスキャンしません。WAL の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。

  • snapshot:完全な既存データをスキャンし、完全スナップショットフェーズ中に生成された新しい WAL データを読み取り、その後ジョブを停止します。

changelog-mode

ストリーム変更をエンコーディングするための変更ログモード。

String

いいえ

all

サポートされている変更ログモードは次のとおりです:

  • ALL:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプをサポートします。

  • UPSERT:INSERT、DELETE、UPDATE_AFTER を含む upsert タイプのみをサポートします。

heartbeat.interval.ms

ハートビートパケットを送信する間隔。

Duration

いいえ

30s

単位はミリ秒です。

PostgreSQL CDC コネクタは、データベースにハートビートパケットを積極的に送信して、スロットのオフセットを進めます。テーブルの変更が頻繁でない場合、この値を設定すると WAL ログを速やかにパージできます。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズでのシャーディングの分割列として列を指定します。

STRING

いいえ

なし

デフォルトでは、プライマリキーの最初の列が選択されます。

scan.incremental.close-idle-reader.enabled

スナップショット終了後にアイドル状態のリーダーを閉じるかどうかを指定します。

Boolean

いいえ

false

このパラメーターは、execution.checkpointing.checkpoints-after-tasks-finish.enabled パラメーターを true に設定した場合にのみ有効です。

scan.incremental.snapshot.backfill.skip

完全スナップショットフェーズでログの読み取りをスキップするかどうかを指定します。

Boolean

いいえ

false

有効値:

  • true:ログの読み取りをスキップします。

    増分フェーズは、低ウォーターマークからログの読み取りを開始します。

    ダウンストリームのオペレーターまたはストレージがべき等性をサポートしている場合は、完全スナップショットフェーズでログの読み取りをスキップすることを推奨します。これにより WAL スロットの数が減りますが、at-least-once セマンティクスのみが提供されます。

  • false:ログの読み取りをスキップしません。

    完全スナップショットフェーズ中、コネクタは低ウォーターマークと高ウォーターマークの間のログを読み取り、整合性を確保します。

    SQL が集約や結合などの操作を実行する場合は、完全スナップショットフェーズでログの読み取りをスキップしないことを推奨します。

データ型のマッピング

次の表に、PostgreSQL と Flink のデータ型のマッピングを示します。

PostgreSQL データ型

Flink データ型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

サンプルコード

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

データインジェスト

Realtime Compute for Apache Flink V11.4 以降では、データインジェスト YAML ジョブで PostgreSQL コネクタをデータソースとして使用できます。

構文

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

パラメーター

パラメーター

説明

必須

データ型

デフォルト値

注意事項

type

データソースのタイプ。

はい

STRING

なし

値は postgres である必要があります。

name

データソースの名前。

いいえ

STRING

なし

なし。

hostname

PostgreSQL データベースサーバーのドメイン名または IP アドレス。

はい

STRING

(なし)

なし。

port

PostgreSQL データベースサーバーが公開するポート。

いいえ

INTEGER

5432

なし。

username

PostgreSQL データベースのユーザー名。

はい

STRING

(なし)

なし。

password

PostgreSQL データベースのパスワード。

はい

STRING

(なし)

なし。

tables

キャプチャする PostgreSQL データベースのテーブル名です。

正規表現を使用すると、式に一致する複数のテーブルをモニターできます。

はい

文字列

(なし)

重要

現在、キャプチャできるのは同じデータベース内のテーブルのみです。

ピリオド (.) は、データベース、スキーマ、およびテーブル名の区切り文字として使用されます。正規表現でピリオド (.) を任意の文字にマッチさせるには、バックスラッシュ (\) でエスケープする必要があります。例:bdb.schema_\.*.order_\.*

slot.name

PostgreSQL レプリケーションスロットの名前。

はい

STRING

(なし)

名前は PostgreSQL レプリケーションスロットの命名規則に従う必要があり、小文字、数字、アンダースコアのみを含めることができます。

decoding.plugin.name

PostgreSQL サーバーにインストールされている論理デコーディングプラグインの名前。

いいえ

STRING

pgoutput

有効な値には decoderbufspgoutput が含まれます。

tables.exclude

除外する PostgreSQL データベーステーブルの名前。このパラメーターは tables パラメーターの後に有効になります。

いいえ

STRING

(なし)

正規表現を使用して、式に一致する複数のテーブルを除外することもできます。使用方法は tables パラメーターと同じです。

server-time-zone

データベースサーバーのセッションタイムゾーン (例:「Asia/Shanghai」)。

いいえ

STRING

(なし)

このパラメーターが設定されていない場合、システムのデフォルトタイムゾーン (ZoneId.systemDefault()) がサーバーのタイムゾーンを決定するために使用されます。

scan.incremental.snapshot.chunk.size

増分スナップショットフレームワークにおける各チャンクのサイズ (行数で指定)。

いいえ

INTEGER

8096

増分スナップショット読み取りを有効にすると、テーブルはチャンクに分割されます。各チャンクのデータは、完全に読み取られる前にメモリにキャッシュされます。

チャンクサイズが小さいと、チャンク数が多くなります。これにより、障害回復の粒度が向上しますが、Out-of-Memory (OOM) エラーや全体的なスループットの低下を引き起こす可能性があります。これらの要因のバランスを取るために、適切なチャンクサイズを設定してください。

scan.snapshot.fetch.size

テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。

いいえ

INTEGER

1024

なし。

scan.startup.mode

データ消費の起動モード。

いいえ

STRING

initial

有効値:

  • initial (デフォルト):最初の起動時に、コネクタは完全な既存データをスキャンし、その後最新の WAL データを読み取ります。

  • latest-offset:最初の起動時に、コネクタは既存データをスキャンしません。WAL の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた変更のみを読み取ります。

  • committed-offset:既存データをスキャンしません。指定された位置から増分データの消費を開始します。

  • snapshot:完全な既存データのみを消費し、増分データは消費しません。

scan.incremental.close-idle-reader.enabled

スナップショットフェーズの終了後にアイドル状態のリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この構成は、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定した場合にのみ有効になります。

scan.lsn-commit.checkpoints-num-delay

コネクタが LSN オフセットのコミットを開始する前に遅延させるチェックポイントの数。

いいえ

INTEGER

3

チェックポイント LSN オフセットは、状態からの回復失敗を防ぐためにローリングベースでコミットされます。

connect.timeout

タイムアウトが発生する前に、コネクタが PostgreSQL データベースサーバーへの接続を待機する最大時間。

いいえ

DURATION

30s

値は 250 ms 未満にすることはできません。

connect.max-retries

コネクタが PostgreSQL データベースサーバーへの接続を確立するための最大再試行回数。

いいえ

INTEGER

3

なし。

connection.pool.size

接続プールのサイズ。

いいえ

INTEGER

20

なし。

jdbc.properties.*

カスタム JDBC URL プロパティを渡すことができます。

いいえ

STRING

20

カスタムプロパティを渡すことができます。例:'jdbc.properties.useSSL' = 'false'

heartbeat.interval

最新の利用可能な WAL オフセットを追跡するためにハートビートイベントを送信する間隔。

いいえ

DURATION

30s

なし。

debezium.*

PostgreSQL サーバーからのデータ変更をキャプチャするために使用される Debezium Embedded Engine に Debezium プロパティを渡します。

いいえ

STRING

(なし)

Debezium PostgreSQL コネクタのプロパティの詳細については、「関連ドキュメント」をご参照ください。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

STRING

1000

メタデータがこの値より大きい場合、メタデータは複数の部分に分けて渡されます。

metadata.list

ダウンストリームに渡され、変換モジュールで使用できる読み取り可能なメタデータのリスト。

いいえ

STRING

false

値を区切るにはカンマ (,) を使用します。現在利用可能なメタデータは op_ts です。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。

いいえ

STRING

false

有効値:

  • true:スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布します。

  • false (デフォルト):スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布しません。

重要

これは実験的な機能です。この機能を有効にすると、TaskManager がスナップショットフェーズの最後のチャンクを同期する際の Out-of-Memory (OOM) エラーのリスクを軽減できます。ジョブを初めて開始する前に、この構成を追加することを推奨します。

関連ドキュメント

  • Realtime Compute for Apache Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。

  • PolarDB for PostgreSQL (Oracle Compatible) 1.0 の結果テーブルにデータを書き込むには、「PolarDB for PostgreSQL (Oracle Compatible) 1.0」をご参照ください。

  • RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースからの読み取りまたは書き込みを行うには、MySQL コネクタを使用します。