Postgres CDC コネクタは、PostgreSQL データベースの完全なスナップショットを読み取った後、後続の変更データをキャプチャします。このプロセスにより、各レコードが 1 回だけ処理されることが保証されます。このトピックでは、Postgres CDC コネクタの使用方法について説明します。
背景情報
Postgres CDC コネクタは、以下をサポートしています。
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル 説明 結果テーブルとディメンションテーブルには JDBC コネクタを使用してください。 |
モード | ストリームモードのみ |
データフォーマット | 該当なし |
固有のメトリック | |
API タイプ | SQL およびデータインジェスト YAML |
結果テーブルデータの更新または削除のサポート | 該当なし |
特徴
Postgres CDC コネクタは、Ververica Runtime (VVR) 8.0.6 以降で利用可能な、変更データキャプチャ (CDC) のための増分スナップショットフレームワークと統合されています。完全な既存データを読み取った後、コネクタは自動的に先行書き込みログ (WAL) からの変更ログの読み取りに切り替わります。このプロセスは、1回限りのセマンティクスを保証します。Postgres CDC ソーステーブルは、完全データの同時かつロックフリーな読み取りをサポートし、ブレークポイントからの再開が可能です。
ソーステーブルとしての特徴と利点は次のとおりです:
ストリーム処理とバッチ処理の統合。コネクタは完全データと増分データの両方の読み取りをサポートしているため、2つの別々のデータ処理ジョブを維持する必要がありません。
完全データの同時読み取り。この機能により、水平スケーリングが可能になり、パフォーマンスが向上します。
完全読み取りから増分読み取りへのシームレスな切り替え。コネクタは自動的にスケールインし、計算リソースを節約します。
再開可能な読み取り。コネクタは完全データ読み取りフェーズ中にブレークポイントから再開できるため、ジョブの安定性が向上します。
ロックフリーな読み取り。完全データの読み取りにはロックが不要なため、オンラインビジネス運用への影響を防ぎます。
前提条件
Postgres CDC コネクタは、PostgreSQL の論理レプリケーション機能を使用して、変更データキャプチャ (CDC) ストリームを読み取ります。このコネクタは、Alibaba Cloud RDS for PostgreSQL、Amazon RDS for PostgreSQL、および自己管理 PostgreSQL をサポートしています。
必要な構成は、Alibaba Cloud RDS for PostgreSQL、Amazon RDS for PostgreSQL、および自己管理 PostgreSQL で異なる場合があります。詳細な構成手順については、「Postgres の構成」ドキュメントをご参照ください。
構成が完了したら、次の条件が満たされていることを確認してください:
wal_level パラメーターは `logical` に設定する必要があります。この設定により、論理デコーディングをサポートするために必要な情報が先行書き込みログ (WAL) に追加されます。
サブスクライブするテーブルの REPLICA IDENTITY は `FULL` に設定する必要があります。この設定により、`INSERT` および `UPDATE` イベントにテーブルのすべての列の以前の値が含まれるようになり、データ整合性が保証されます。
説明`REPLICA IDENTITY` は PostgreSQL 固有のテーブルレベルの設定です。これにより、`INSERT` および `UPDATE` イベントに影響を受ける列の以前の値が含まれるかどうかが決まります。`REPLICA IDENTITY` の値の詳細については、「REPLICA IDENTITY」をご参照ください。
`max_wal_senders` および `max_replication_slots` パラメーターの値が、現在使用中のレプリケーションスロットの合計数と Flink ジョブで必要なスロット数の合計よりも大きいことを確認してください。
アカウントに `SUPERUSER` 権限、または `LOGIN` と `REPLICATION` の両方の権限があることを確認してください。また、アカウントには、完全データをクエリするために、サブスクライブするテーブルに対する `SELECT` 権限も必要です。
注意事項
Realtime Compute for Apache Flink V8.0.6 以降のみが、Postgres CDC の増分スナップショット機能をサポートしています。
レプリケーションスロット
Flink PostgreSQL CDC ジョブはレプリケーションスロットを使用して、先行書き込みログ (WAL) が早期に消去されるのを防ぎ、データ整合性を確保します。適切に管理されない場合、レプリケーションスロットはディスク領域の浪費やデータ読み取りの遅延などの問題を引き起こす可能性があります。以下のベストプラクティスに従うことを推奨します:
未使用のスロットを速やかにクリーンアップする
Flink は、ジョブが停止した後でも、レプリケーションスロットを自動的に削除しません。これは特にステートレス再起動の場合に当てはまります。この動作は、WAL が消去された場合に発生する可能性のあるデータ損失を防ぎます。
ジョブが再起動されないことを確認した場合、関連するレプリケーションスロットを手動で削除してディスク領域を解放する必要があります。
重要ライフサイクル管理:レプリケーションスロットをジョブリソースとして扱い、ジョブの開始と停止と同期して管理してください。
古いスロットの再利用を避ける
新しいジョブは、古いスロットを再利用するのではなく、新しいスロット名を使用する必要があります。スロットを再利用すると、ジョブが起動時に大量の履歴 WAL データを読み取ることになり、最新データの処理が遅れる可能性があります。
PostgreSQL の論理レプリケーションでは、1つのスロットは1つの接続でのみ使用できます。異なるジョブは異なるスロット名を使用する必要があります。
重要命名規則:`slot.name` をカスタマイズする場合、一時スロットとの競合を避けるために、`my_slot_1` のような数値サフィックスを持つ名前の使用は避けてください。
増分スナップショットが有効な場合のスロットの動作
前提条件:チェックポイントが有効であり、ソーステーブルにプライマリキーが定義されている必要があります。
スロット作成ルール:
増分スナップショットが無効:単一の同時実行のみがサポートされます。1つのグローバルスロットが使用されます。
増分スナップショットが有効:
完全フェーズ:各同時ソースサブタスクは一時スロットを作成します。命名フォーマットは
${slot.name}_${task_id}です。増分フェーズ:すべての一時スロットは自動的に回収されます。1 つのグローバルスロットのみが保持されます。
最大スロット数:ソースの同時実行数 + 1 (完全フェーズ中)
リソースとパフォーマンス
PostgreSQL で利用可能なスロット数やディスク領域が限られている場合は、完全フェーズの同時実行数を減らして、使用する一時スロットを少なくすることができます。この操作により、完全フェーズ中の読み取り速度は低下します。
ダウンストリームシンクがべき等書き込みをサポートしている場合は、
scan.incremental.snapshot.backfill.skip = trueを設定できます。この設定は、完全フェーズ中の WAL バックフィルをスキップし、ジョブの起動を高速化します。この構成は at-least-once セマンティクスのみを提供します。集約やディメンションテーブルの結合など、ステートフルな計算を伴うジョブには適していません。なぜなら、中間状態に必要な履歴の変更が失われる可能性があるためです。
増分スナップショットが無効な場合、全表スキャンフェーズ中にチェックポイントはサポートされません。
Postgres サブスクリプションの再利用
Postgres CDC コネクタは、どのテーブルの変更がスロットにプッシュされるかを決定するためにパブリケーションに依存しています。複数のジョブが同じパブリケーションを共有する場合、それらの構成は上書きされます。
原因
publication.autocreate.mode のデフォルト値は filtered であり、コネクタ構成で指定されたテーブルのみが含まれます。このモードは、ジョブが開始されるとパブリケーション内のテーブルを変更するため、他のジョブの読み取り操作に影響を与える可能性があります。
ソリューション
PostgreSQL で、監視対象のすべてのテーブルを含むパブリケーションを手動で作成します。または、ジョブごとに個別のパブリケーションを作成します。
-- すべてのテーブル (または指定されたテーブル、ジョブごとに1つのパブリケーションを作成) を含む my_flink_pub という名前のパブリケーションを作成します CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b; -- または、より簡単に、データベース内のすべてのテーブルを含めます CREATE PUBLICATION my_flink_pub FOR ALL TABLES;説明データベース内のすべてのテーブルをサブスクライブすることは推奨されません。データベースが大きく、多くのテーブルが含まれている場合、ネットワーク帯域幅の浪費や Flink クラスターの高い CPU 消費を引き起こす可能性があります。
以下の Flink 構成を追加します:
debezium.publication.name = 'my_flink_pub'(パブリケーション名を指定)debezium.publication.autocreate.mode = 'disabled'(Flink が起動時にパブリケーションを作成または変更しようとするのを防ぐ)
このアプローチは完全な隔離を提供し、Flink に依存するのではなく手動でパブリケーションを管理できます。これにより、新しいジョブが既存のジョブに影響を与えるのを防ぎ、より安全なアクセス制御を提供します。
SQL
構文
CREATE TABLE postgrescdc_source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>',
'decoding.plugin.name'= 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
-- バックフィルをスキップすると、読み取りが高速化され、リソース使用量が削減されますが、データが重複する可能性があります。ダウンストリームシンクがべき等である場合に有効にします。
'scan.incremental.snapshot.backfill.skip' = 'false',
-- 本番環境では、これを 'filtered' または 'disabled' に設定し、Flink を介さずにパブリケーションを手動で管理します。
'debezium-publication.autocreate.mode' = 'disabled'
-- 複数のソースがある場合は、ソースごとに異なるパブリケーションを構成します。
--'debezium.publication.name' = 'my_flink_pub'
);WITH パラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | コネクタのタイプ。 | STRING | はい | なし | 値は |
hostname | PostgreSQL データベースの IP アドレスまたはホスト名。 | STRING | はい | なし | なし。 |
username | PostgreSQL データベースサービスのユーザー名。 | STRING | はい | なし | なし。 |
password | PostgreSQL データベースサービスのパスワード。 | STRING | はい | なし | なし。 |
database-name | データベース名。 | STRING | はい | なし | データベース名。 |
schema-name | PostgreSQL のスキーマ名。 | STRING | はい | なし | スキーマ名は正規表現をサポートしており、複数のスキーマからデータを読み取ることができます。 |
table-name | PostgreSQL のテーブル名。 | STRING | はい | なし | テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。 |
port | PostgreSQL データベースサービスのポート番号。 | INTEGER | いいえ | 5432 | なし。 |
decoding.plugin.name | PostgreSQL の論理デコーディングプラグインの名前。 | STRING | いいえ | decoderbufs | これは、PostgreSQL サービスにインストールされているプラグインによって決まります。サポートされているプラグインは次のとおりです:
|
slot.name | 論理デコーディングスロットの名前。 | STRING | VVR 8.0.1 以降では必須。以前のバージョンではオプション。 | VVR 8.0.1 以降ではなし。以前のバージョンではデフォルトは `flink` です。 |
|
debezium.* | Debezium のプロパティとパラメーター | STRING | いいえ | なし | Debezium クライアントの動作をより細かく制御します。例: |
scan.incremental.snapshot.enabled | 増分スナップショットを有効にするかどうかを指定します。 | BOOLEAN | いいえ | false | 有効値:
|
scan.startup.mode | データ消費の起動モード。 | STRING | いいえ | initial | 有効値:
|
changelog-mode | ストリーム変更をエンコーディングするための変更ログモード。 | String | いいえ | all | サポートされている変更ログモード:
|
heartbeat.interval.ms | ハートビートパケットを送信する間隔。 | Duration | いいえ | 30s | 単位はミリ秒です。 Postgres CDC コネクタは、データベースに積極的にハートビートを送信してスロットのオフセットを進めます。テーブルの変更が頻繁でない場合、この値を設定することで WAL ログのタイムリーな回収が保証されます。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズ中にシャードを分割するためのチャンクキーとして使用する列を指定します。 | STRING | いいえ | なし | デフォルトでは、プライマリキーの最初の列が選択されます。 |
scan.incremental.close-idle-reader.enabled | スナップショットが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。 | Boolean | いいえ | false | この構成を有効にするには、 |
scan.incremental.snapshot.backfill.skip | 完全フェーズ中にログの読み取りをスキップするかどうかを指定します。 | Boolean | いいえ | false | 有効値:
|
型マッピング
以下の表は、PostgreSQL と Flink のフィールド型のマッピングを示しています。
PostgreSQL フィールド型 | Flink フィールド型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
例
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;データインジェスト
Realtime Compute for Apache Flink V11.4 以降、データ取り込み YAML ジョブで PostgreSQL コネクタをデータソースとして使用できます。
構文
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...パラメーター
パラメーター | 説明 | 必須 | データ型 | デフォルト | 備考 |
type | データソースのタイプ。 | はい | STRING | なし | 値は `postgres` である必要があります。 |
name | データソース名。 | いいえ | STRING | なし | なし。 |
hostname | PostgreSQL データベースサーバーのドメイン名または IP アドレス。 | はい | STRING | (なし) | なし。 |
port | PostgreSQL データベースサーバーが公開するポート。 | いいえ | INTEGER | 5432 | なし。 |
username | PostgreSQL のユーザー名。 | はい | STRING | (なし) | なし。 |
password | PostgreSQL のパスワード。 | はい | STRING | (なし) | なし。 |
tables | キャプチャする PostgreSQL データベーステーブルの名前。 正規表現をサポートしており、式に一致する複数のテーブルを監視できます。 | はい | 文字列 | (なし) | 重要 現在、同じデータベース内のテーブルのみをキャプチャできます。 ピリオド (.) は、データベース、スキーマ、およびテーブル名の区切り文字として扱われます。正規表現でピリオド (.) を任意の文字に一致させるために使用するには、バックスラッシュでエスケープする必要があります。例: |
slot.name | PostgreSQL レプリケーションスロットの名前。 | はい | STRING | (なし) | 名前は PostgreSQL レプリケーションスロットの命名規則に準拠し、小文字、数字、およびアンダースコアを含めることができます。 |
decoding.plugin.name | サーバーにインストールされている PostgreSQL 論理デコーディングプラグインの名前。 | いいえ | STRING |
| オプションの値には |
tables.exclude | 除外する PostgreSQL データベーステーブルの名前。このパラメーターは `tables` パラメーターの後に有効になります。 | いいえ | STRING | (なし) | テーブル名も正規表現をサポートしており、式に一致する複数のテーブルを除外できます。使用方法は `tables` パラメーターと同じです。 |
server-time-zone | データベースサーバーのセッションタイムゾーン (例:「Asia/Shanghai」)。 | いいえ | STRING | (なし) | 設定されていない場合、システムのデフォルトタイムゾーン ( |
scan.incremental.snapshot.chunk.size | 増分スナップショットフレームワークにおける各チャンクのサイズ (行数)。 | いいえ | INTEGER | 8096 | 増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンクのデータは、完全に読み取られるまでメモリにキャッシュされます。 チャンクあたりの行数が少ないと、テーブルのチャンク総数が多くなります。これにより、エラー回復の粒度は小さくなりますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて適切なチャンクサイズを設定する必要があります。 |
scan.snapshot.fetch.size | テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。 | いいえ | INTEGER | 1024 | なし。 |
scan.startup.mode | データ消費の起動モード。 | いいえ | STRING | initial | 有効値:
|
scan.incremental.close-idle-reader.enabled | スナップショットが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | この構成を有効にするには、`execution.checkpointing.checkpoints-after-tasks-finish.enabled` を `true` に設定します。 |
scan.lsn-commit.checkpoints-num-delay | LSN オフセットのコミットを開始する前に遅延させるチェックポイントの数。 | いいえ | INTEGER | 3 | チェックポイントの LSN オフセットは、状態から回復できなくなるのを防ぐために、ローリング方式でコミットされます。 |
connect.timeout | コネクタが PostgreSQL データベースサーバーへの接続を試行してからタイムアウトするまでの最大待機時間。 | いいえ | DURATION | 30s | この値は 250 ミリ秒未満にすることはできません。 |
connect.max-retries | コネクタが PostgreSQL データベースサーバーへの接続を確立しようとする最大回数。 | いいえ | INTEGER | 3 | なし。 |
connection.pool.size | 接続プールのサイズ。 | いいえ | INTEGER | 20 | なし。 |
jdbc.properties.* | ユーザーがカスタム JDBC URL プロパティを渡すことを許可します。 | いいえ | STRING | 20 | ユーザーは、 |
heartbeat.interval | 最新の利用可能な WAL ログオフセットを追跡するためにハートビートイベントを送信する間隔。 | いいえ | DURATION | 30s | なし。 |
debezium.* | PostgreSQL サーバーからのデータ変更をキャプチャするために使用される Debezium Embedded Engine に Debezium プロパティを渡します。 | いいえ | STRING | (なし) | Debezium PostgreSQL コネクタのプロパティの詳細については、「関連ドキュメント」をご参照ください。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | いいえ | STRING | 1000 | メタデータがこの値より大きい場合、複数の部分に分けて渡されます。 |
metadata.list | ダウンストリームに渡される読み取り可能なメタデータのリスト。変換モジュールで使用できます。 | いいえ | STRING | false | 区切り文字としてカンマ (,) を使用します。現在、利用可能なメタデータは |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズ中に、境界のないチャンクを最初にディスパッチするかどうかを指定します。 | いいえ | STRING | false | 有効値:
重要 これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のチャンクを同期する際のメモリ不足 (OOM) エラーのリスクを軽減できます。ジョブの最初の起動前に追加することを推奨します。 |
関連ドキュメント
Realtime Compute for Apache Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。
PolarDB for PostgreSQL (Oracle Compatible) の結果テーブルにデータを書き込むには、「PolarDB for PostgreSQL (Oracle Compatible)」をご参照ください。
RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースからの読み取りまたは書き込みを行うには、MySQL コネクタを使用します。