PostgreSQL 変更データキャプチャ (CDC) コネクタは、PostgreSQL データベースから完全なスナップショットと変更データを読み取ります。このコネクタは、各データレコードが 1 回だけ読み取られることを保証し、障害回復中に 1 回限りのセマンティクスを維持します。このトピックでは、PostgreSQL CDC コネクタの使用方法について説明します。
背景情報
Postgres CDC コネクタには、次の機能があります。
項目 | 説明 |
サポートされるタイプ | ソーステーブル 説明 JDBC コネクタを使用して、結果テーブルまたはディメンションテーブルを作成できます。 |
実行モード | ストリーミングモードのみ |
データフォーマット | 該当なし |
特定の監視メトリック |
説明
|
API タイプ | SQL およびデータインジェスト YAML |
結果テーブルのデータの更新または削除 | 該当なし |
特徴
PostgreSQL CDC コネクタは、増分スナップショットフレームワークを使用します。これは、Ververica Runtime (VVR) 8.0.6 以降を使用する Realtime Compute for Apache Flink で利用できます。コネクタはまず完全な既存データを読み取り、その後自動的に先行書き込みログ (WAL) の変更ログの読み取りに切り替わります。このプロセスにより、データの欠落や重複がないことが保証されます。障害が発生した場合でも、データは 1 回限りのセマンティクスで処理されます。PostgreSQL CDC ソーステーブルは、完全データの同時読み取り、ロックフリーの読み取り、および再開可能なデータ転送をサポートします。
ソーステーブルとして、次の特徴と利点があります:
ストリーム処理とバッチ処理を統合します。完全データと増分データの読み取りをサポートするため、2 つの別々のプロセスを維持する必要がなくなります。
完全データの同時読み取りをサポートし、水平方向のパフォーマンススケーリングを実現します。
完全データの読み取りから増分データの読み取りにシームレスに切り替え、自動的にスケールインして計算リソースを節約します。
完全データ読み取りフェーズ中の再開可能なデータ転送をサポートし、安定性を向上させます。
オンラインビジネス運用に影響を与えないように、ロックなしで完全データを読み取ります。
前提条件
PostgreSQL CDC コネクタは、PostgreSQL データベースの論理レプリケーション機能を使用して CDC データストリームを読み取ります。このコネクタは Alibaba Cloud RDS for PostgreSQL、Amazon RDS for PostgreSQL、および自己管理 PostgreSQL をサポートします。
Alibaba Cloud RDS for PostgreSQL、Amazon RDS for PostgreSQL、および自己管理 PostgreSQL の構成は異なります。開始する前に、「PostgreSQL の構成」ドキュメントで説明されている必要な構成を完了してください。
構成を完了した後、次の条件が満たされていることを確認してください:
wal_level パラメーターの値が logical に設定されていること。これにより、論理エンコーディングに必要な情報が先行書き込みログ (WAL) に追加されます。
サブスクライブするテーブルの REPLICA IDENTITY が FULL に設定されていること。これにより、INSERT および UPDATE イベントにテーブルのすべての列の以前の値が含まれるようになり、データ同期の整合性が保証されます。
説明REPLICA IDENTITY は、PostgreSQL 固有のテーブルレベルの設定です。これにより、論理デコーディングプラグインが INSERT および UPDATE イベント中に、関連するテーブル列の以前の値を含めるかどうかが決まります。REPLICA IDENTITY の値の詳細については、「REPLICA IDENTITY」をご参照ください。
max_wal_senders および max_replication_slots パラメーターの値が、データベースで現在使用中のレプリケーションスロットの合計と Flink ジョブで必要なスロット数の合計よりも大きいこと。
アカウントに SUPERUSER システム権限、または LOGIN と REPLICATION の両方の権限があること。また、完全データをクエリするために、サブスクライブするテーブルに対する SELECT 権限も必要です。
注意事項
PostgreSQL CDC の増分スナップショット機能は、Realtime Compute for Apache Flink V8.0.6 以降でのみサポートされます。
Flink PostgreSQL CDC ジョブは、レプリケーションスロットに依存して、先行書き込みログ (WAL) が早期にパージされないようにし、データ整合性を保証します。ただし、不適切な管理はディスク領域の浪費やデータ読み取りレイテンシなどの問題を引き起こす可能性があります。以下の推奨事項に従ってください:
使用されなくなったスロットを速やかにパージする
Flink は、特にステートレス再起動シナリオでは、ジョブが停止した後でもレプリケーションスロットを自動的に削除しません。この動作は、WAL がパージされた場合に発生する可能性のあるデータ損失を防ぎます。
ジョブが再起動されないことを確認した場合、関連するレプリケーションスロットを手動で削除してディスク領域を解放する必要があります。
重要ライフサイクル管理:レプリケーションスロットをジョブのリソースの一部として扱います。ジョブの開始および停止操作と同期して管理します。
古いスロットの再利用を避ける
新しいジョブは、古いスロット名を再利用するのではなく、新しいスロット名を使用する必要があります。スロットを再利用すると、ジョブが起動時に大量の履歴 WAL データを読み取ることになり、最新データの読み取りが遅延する原因となります。
PostgreSQL の論理レプリケーションでは、1 つのスロットは 1 つの接続でのみ使用できます。したがって、異なるジョブは異なるスロット名を使用する必要があります。
重要命名規則:`slot.name` をカスタマイズする場合、一時スロットとの競合を避けるために、`my_slot_1` のような数値サフィックスを持つ名前の使用は避けてください。
増分スナップショットが有効な場合のスロットの動作
前提条件:チェックポイントを有効にし、ソーステーブルにプライマリキーが必要です。
スロット作成ルール:
増分スナップショットが無効な場合:単一の同時実行のみがサポートされ、1 つのグローバルスロットを使用します。
増分スナップショットが有効な場合:
完全フェーズ:各同時ソースサブタスクは、
${slot.name}_${task_id}という形式の名前で一時スロットを作成します。増分フェーズ:すべての一時スロットは自動的に回収されます。1 つのグローバルスロットのみが保持されます。
最大スロット数:ソースの同時実行数 + 1 (完全スナップショットフェーズ中)
リソースとパフォーマンス
PostgreSQL のスロット数またはディスク領域が限られている場合は、完全スナップショットフェーズの同時実行数を減らして、一時スロットの数を減らすことができます。これにより、完全データの読み取り速度が低下します。
ダウンストリームシステムがべき等書き込みをサポートしている場合は、
scan.incremental.snapshot.backfill.skip = trueを設定して、完全フェーズ中の WAL バックフィルをスキップし、起動速度を向上させることができます。この構成は at-least-once セマンティクスのみを提供します。集約やディメンションテーブル結合などのステートフル計算を持つジョブには適していません。中間状態に必要な履歴の変更が失われる可能性があるためです。
増分スナップショット機能が有効でない場合、PostgreSQL CDC コネクタは全表スキャンフェーズ中のチェックポイントの実行をサポートしません。
増分スナップショットが有効でない場合、全表スキャンフェーズ中にチェックポイントがトリガーされると、チェックポイントタイムアウトによりジョブがフェールオーバーする可能性があります。したがって、[その他の構成] セクションで次のパラメーターを構成して、完全同期フェーズ中のチェックポイントタイムアウトによるフェールオーバーを防ぐことができます。詳細については、「ジョブのカスタム実行時パラメーターを構成する方法」をご参照ください。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647次の表にパラメーターを示します。
パラメーター
説明
注意事項
execution.checkpointing.interval
チェックポイントがトリガーされる間隔。
データ型:Duration。例:10 min または 30 s。
execution.checkpointing.tolerable-failed-checkpoints
許容されるチェックポイントの失敗回数。
このパラメーターの値とチェックポイントのスケジューリング間隔の積によって、許容されるスナップショットの読み取り時間が決まります。
説明テーブルが非常に大きい場合は、このパラメーターをより大きな値に設定してください。
restart-strategy
再起動ポリシー。
有効値:
fixed-delay:固定遅延再起動ポリシー。
failure-rate:失敗率再起動ポリシー。
exponential-delay:指数遅延再起動ポリシー。
詳細については、「再起動戦略」をご参照ください。
restart-strategy.fixed-delay.attempts
固定遅延再起動ポリシーの最大再試行回数。
なし。
SQL
構文
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);WITH 句のパラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意事項 |
connector | コネクタのタイプ。 | STRING | はい | なし | 値は |
hostname | PostgreSQL データベースの IP アドレスまたはホスト名。 | STRING | はい | なし | なし。 |
username | PostgreSQL データベースサービスのユーザー名。 | STRING | はい | なし | なし。 |
password | PostgreSQL データベースサービスのパスワード。 | STRING | はい | なし | なし。 |
database-name | データベース名。 | STRING | はい | なし | データベース名。 |
schema-name | PostgreSQL のスキーマ名。 | STRING | はい | なし | スキーマ名は正規表現をサポートしており、複数のスキーマからデータを読み取ることができます。 |
table-name | PostgreSQL のテーブル名。 | STRING | はい | なし | テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。 |
port | PostgreSQL データベースサービスのポート番号。 | INTEGER | いいえ | 5432 | なし。 |
decoding.plugin.name | PostgreSQL の論理デコーディングプラグインの名前。 | STRING | いいえ | decoderbufs | これは、PostgreSQL サービスにインストールされているプラグインによって決まります。サポートされているプラグインは次のとおりです:
|
slot.name | 論理デコーディングスロットの名前。 | STRING | 8.0.1 より前のバージョンではオプション。8.0.1 以降のバージョンでは必須。 | 8.0.1 より前のバージョンのデフォルト値は flink です。8.0.1 以降のバージョンにはデフォルト値はありません。 |
|
debezium.* | Debezium プロパティパラメーター。 | STRING | いいえ | なし | Debezium クライアントの動作をより詳細に制御できます。例: |
scan.incremental.snapshot.enabled | 増分スナップショットを有効にするかどうかを指定します。 | BOOLEAN | いいえ | false | 有効値:
|
scan.startup.mode | データ消費の起動モード。 | STRING | いいえ | initial | 有効値:
|
changelog-mode | ストリーム変更をエンコーディングするための変更ログモード。 | String | いいえ | all | サポートされている変更ログモードは次のとおりです:
|
heartbeat.interval.ms | ハートビートパケットを送信する間隔。 | Duration | いいえ | 30s | 単位はミリ秒です。 PostgreSQL CDC コネクタは、データベースにハートビートパケットを積極的に送信して、スロットのオフセットを進めます。テーブルの変更が頻繁でない場合、この値を設定すると WAL ログを速やかにパージできます。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズでのシャーディングの分割列として列を指定します。 | STRING | いいえ | なし | デフォルトでは、プライマリキーの最初の列が選択されます。 |
scan.incremental.close-idle-reader.enabled | スナップショット終了後にアイドル状態のリーダーを閉じるかどうかを指定します。 | Boolean | いいえ | false | このパラメーターは、 |
scan.incremental.snapshot.backfill.skip | 完全スナップショットフェーズでログの読み取りをスキップするかどうかを指定します。 | Boolean | いいえ | false | 有効値:
|
データ型のマッピング
次の表に、PostgreSQL と Flink のデータ型のマッピングを示します。
PostgreSQL データ型 | Flink データ型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
サンプルコード
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;データインジェスト
Realtime Compute for Apache Flink V11.4 以降では、データインジェスト YAML ジョブで PostgreSQL コネクタをデータソースとして使用できます。
構文
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...パラメーター
パラメーター | 説明 | 必須 | データ型 | デフォルト値 | 注意事項 |
type | データソースのタイプ。 | はい | STRING | なし | 値は postgres である必要があります。 |
name | データソースの名前。 | いいえ | STRING | なし | なし。 |
hostname | PostgreSQL データベースサーバーのドメイン名または IP アドレス。 | はい | STRING | (なし) | なし。 |
port | PostgreSQL データベースサーバーが公開するポート。 | いいえ | INTEGER | 5432 | なし。 |
username | PostgreSQL データベースのユーザー名。 | はい | STRING | (なし) | なし。 |
password | PostgreSQL データベースのパスワード。 | はい | STRING | (なし) | なし。 |
tables | キャプチャする PostgreSQL データベースのテーブル名です。 正規表現を使用すると、式に一致する複数のテーブルをモニターできます。 | はい | 文字列 | (なし) | 重要 現在、キャプチャできるのは同じデータベース内のテーブルのみです。 ピリオド (.) は、データベース、スキーマ、およびテーブル名の区切り文字として使用されます。正規表現でピリオド (.) を任意の文字にマッチさせるには、バックスラッシュ (\) でエスケープする必要があります。例: |
slot.name | PostgreSQL レプリケーションスロットの名前。 | はい | STRING | (なし) | 名前は PostgreSQL レプリケーションスロットの命名規則に従う必要があり、小文字、数字、アンダースコアのみを含めることができます。 |
decoding.plugin.name | PostgreSQL サーバーにインストールされている論理デコーディングプラグインの名前。 | いいえ | STRING |
| 有効な値には |
tables.exclude | 除外する PostgreSQL データベーステーブルの名前。このパラメーターは tables パラメーターの後に有効になります。 | いいえ | STRING | (なし) | 正規表現を使用して、式に一致する複数のテーブルを除外することもできます。使用方法は tables パラメーターと同じです。 |
server-time-zone | データベースサーバーのセッションタイムゾーン (例:「Asia/Shanghai」)。 | いいえ | STRING | (なし) | このパラメーターが設定されていない場合、システムのデフォルトタイムゾーン ( |
scan.incremental.snapshot.chunk.size | 増分スナップショットフレームワークにおける各チャンクのサイズ (行数で指定)。 | いいえ | INTEGER | 8096 | 増分スナップショット読み取りを有効にすると、テーブルはチャンクに分割されます。各チャンクのデータは、完全に読み取られる前にメモリにキャッシュされます。 チャンクサイズが小さいと、チャンク数が多くなります。これにより、障害回復の粒度が向上しますが、Out-of-Memory (OOM) エラーや全体的なスループットの低下を引き起こす可能性があります。これらの要因のバランスを取るために、適切なチャンクサイズを設定してください。 |
scan.snapshot.fetch.size | テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。 | いいえ | INTEGER | 1024 | なし。 |
scan.startup.mode | データ消費の起動モード。 | いいえ | STRING | initial | 有効値:
|
scan.incremental.close-idle-reader.enabled | スナップショットフェーズの終了後にアイドル状態のリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | この構成は、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定した場合にのみ有効になります。 |
scan.lsn-commit.checkpoints-num-delay | コネクタが LSN オフセットのコミットを開始する前に遅延させるチェックポイントの数。 | いいえ | INTEGER | 3 | チェックポイント LSN オフセットは、状態からの回復失敗を防ぐためにローリングベースでコミットされます。 |
connect.timeout | タイムアウトが発生する前に、コネクタが PostgreSQL データベースサーバーへの接続を待機する最大時間。 | いいえ | DURATION | 30s | 値は 250 ms 未満にすることはできません。 |
connect.max-retries | コネクタが PostgreSQL データベースサーバーへの接続を確立するための最大再試行回数。 | いいえ | INTEGER | 3 | なし。 |
connection.pool.size | 接続プールのサイズ。 | いいえ | INTEGER | 20 | なし。 |
jdbc.properties.* | カスタム JDBC URL プロパティを渡すことができます。 | いいえ | STRING | 20 | カスタムプロパティを渡すことができます。例: |
heartbeat.interval | 最新の利用可能な WAL オフセットを追跡するためにハートビートイベントを送信する間隔。 | いいえ | DURATION | 30s | なし。 |
debezium.* | PostgreSQL サーバーからのデータ変更をキャプチャするために使用される Debezium Embedded Engine に Debezium プロパティを渡します。 | いいえ | STRING | (なし) | Debezium PostgreSQL コネクタのプロパティの詳細については、「関連ドキュメント」をご参照ください。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | いいえ | STRING | 1000 | メタデータがこの値より大きい場合、メタデータは複数の部分に分けて渡されます。 |
metadata.list | ダウンストリームに渡され、変換モジュールで使用できる読み取り可能なメタデータのリスト。 | いいえ | STRING | false | 値を区切るにはカンマ (,) を使用します。現在利用可能なメタデータは |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズ中に、境界のないチャンクを最初に配布するかどうかを指定します。 | いいえ | STRING | false | 有効値:
重要 これは実験的な機能です。この機能を有効にすると、TaskManager がスナップショットフェーズの最後のチャンクを同期する際の Out-of-Memory (OOM) エラーのリスクを軽減できます。ジョブを初めて開始する前に、この構成を追加することを推奨します。 |
関連ドキュメント
Realtime Compute for Apache Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。
PolarDB for PostgreSQL (Oracle Compatible) 1.0 の結果テーブルにデータを書き込むには、「PolarDB for PostgreSQL (Oracle Compatible) 1.0」をご参照ください。
RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースからの読み取りまたは書き込みを行うには、MySQL コネクタを使用します。