AnalyticDB for PostgreSQL は、PostgreSQL の論理レプリケーション機能に基づいて完全データと増分データをサブスクライブする、自社開発の Change Data Capture (CDC) コネクタを提供します。このコネクタは Flink とシームレスに統合されます。ソーステーブルからのリアルタイムのデータ変更を効率的にキャプチャし、リアルタイムのデータ同期とストリーム処理を実現します。これにより、企業は動的なデータ要件に迅速に対応できます。このトピックでは、Realtime Compute for Apache Flink CDC を使用して AnalyticDB for PostgreSQL から完全データと増分データをリアルタイムでサブスクライブする方法について説明します。
制限事項
この機能は、マイナーエンジンバージョン 7.2.1.4 以降を実行する AnalyticDB for PostgreSQL V7.0 インスタンスでのみ利用可能です。
説明AnalyticDB for PostgreSQL コンソールのインスタンスの[基本情報]ページでマイナーバージョンを確認できます。インスタンスが必要なバージョンを満たしていない場合は、インスタンスのマイナーバージョンを更新してください。
AnalyticDB for PostgreSQL のサーバーレスモードはサポートされていません。
前提条件
AnalyticDB for PostgreSQL インスタンスと フルマネージド Flink ワークスペース は、同じ VPC にある必要があります。
AnalyticDB for PostgreSQL インスタンスの パラメーター設定 を調整する必要があります。
wal_levelパラメーターを logical に設定して、論理レプリケーションを有効にします。AnalyticDB for PostgreSQL の High-availability Edition インスタンスを使用する場合、
hot_standby、hot_standby_feedback、およびsync_replication_slotsパラメーターを on に設定する必要があります。これにより、プライマリ/セカンダリのフェールオーバーによって論理サブスクリプションが中断されないようになります。
AnalyticDB for PostgreSQL インスタンスには、初期アカウント または RDS_SUPERUSER 権限を持つ特権ユーザー を使用する必要があります。ユーザーには REPLICATION 権限が付与されている必要があります。
ALTER USER <username> WITH REPLICATION;。Flink ワークスペースの CIDR ブロックを AnalyticDB for PostgreSQL インスタンスの ホワイトリスト に追加する必要があります。
flink-sql-connector-adbpg-cdc-3.3.jar をダウンロードし、CDC コネクタを Flink ワークスペースにアップロード する必要があります。
手順
ステップ 1: テストテーブルとテストデータを準備する
AnalyticDB for PostgreSQL コンソール にログインします。管理するインスタンスを見つけて、インスタンス ID をクリックします。
[基本情報] ページの右下隅にある [データベースにログオン] をクリックします。
テストデータベースと adbpg_source_table という名前のソーステーブルを作成します。次に、ソーステーブルに 50 行のデータを挿入します。
-- テストデータベースを作成します。 CREATE DATABASE testdb; -- testdb データベースに切り替えて、スキーマを作成します。 CREATE SCHEMA testschema; -- adbpg_source_table という名前のソーステーブルを作成します。 CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- adbpg_source_table テーブルに 50 行のデータを挿入します。 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Flink が結果データを書き込むための adbpg_sink_table という名前の結果テーブルを作成します。
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
ステップ 2: Flink ジョブを作成する
Realtime Compute for Apache Flink コンソール にログインします。Fully Managed Flink タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
トップメニューバーで [新規] をクリックします。[空白のストリームドラフト] を選択し、[次へ] をクリックします。[新しいファイルドラフト] ダイアログボックスで、ジョブパラメーターを設定します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は現在のプロジェクト内で一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックしてサブフォルダを作成することもできます。ドラフト
エンジンバージョン
ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-6.0.7-flink-1.15
[作成] をクリックします。
ステップ 3: ジョブコードを記述してジョブをデプロイする
アナログデータを生成するための datagen_source という名前のソースと、AnalyticDB for PostgreSQL データベースからリアルタイムのデータ変更をキャプチャするための source_adbpg という名前のソースを作成します。次に、2 つのソースを結合し、結果を sink_adbpg という名前の結果テーブルに書き込みます。処理されたデータは AnalyticDB for PostgreSQL に書き込まれます。
次のジョブコードをエディターにコピーします。
---Datagen コネクタを使用してストリーミングデータを生成する Datagen ソーステーブルを作成します。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --adbpg-cdc コネクタを使用して、slot.name と pgoutput に基づいて adbpg_source_table テーブルのデータ変更をキャプチャする adbpg ソーステーブルを作成します。 CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --処理された結果をデータベースの宛先テーブル adbpg_sink_table に書き込むための adbpg 結果テーブルを作成します。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- datagen_source テーブルと source_adbpg テーブルの結合結果を adbpg 結果テーブルに書き込みます。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;パラメーター
パラメーター
必須
データ型
説明
connector
はい
STRING
コネクタのタイプ。ソーステーブルには値を
adbpg-cdcに、結果テーブルにはadbpgに設定します。hostname
はい
STRING
AnalyticDB for PostgreSQL インスタンスの内部エンドポイント。インスタンスの [基本情報] ページで 内部エンドポイント を取得できます。
username
はい
STRING
AnalyticDB for PostgreSQL インスタンスのデータベースアカウントとパスワード。
password
はい
STRING
database-name
はい
STRING
データベースの名前。
schema-name
はい
STRING
スキーマの名前。このパラメーターは正規表現をサポートしています。一度に複数のスキーマをサブスクライブできます。
table-name
はい
STRING
テーブルの名前。このパラメーターは正規表現をサポートしています。一度に複数のテーブルをサブスクライブできます。
port
はい
INTEGER
AnalyticDB for PostgreSQL のポート。値は 5432 に固定されています。
decoding.plugin.name
はい
STRING
PostgreSQL 論理デコーディングプラグインの名前。値は pgoutput に固定されています。
slot.name
はい
STRING
論理デコーディングスロットの名前。
同じ Flink ジョブ内のソーステーブルには、
slot.nameに同じ値を使用します。異なる Flink ジョブが同じテーブルに関係する場合は、各ジョブに一意の
slot.nameを設定します。これにより、次のエラーが防止されます:PSQLException: ERROR: replication slot "debezium" is active for PID 974。
debezium.*
いいえ
STRING
Debezium クライアントの動作をより細かい粒度で制御します。たとえば、
'debezium.snapshot.mode' = 'never'と設定すると、スナップショット機能が無効になります。詳細については、「構成プロパティ」をご参照ください。scan.incremental.snapshot.enabled
いいえ
BOOLEAN
増分スナップショットを有効にするかどうかを指定します。有効な値:
false (デフォルト): 増分スナップショットは無効になります。
true: 増分スナップショットは有効になります。
scan.startup.mode
いいえ
STRING
データ消費の起動モード。有効な値:
initial (デフォルト): ジョブが初めて開始されると、すべての既存データをスキャンし、最新の先行書き込みログ (WAL) データを読み取ります。これにより、完全データと増分データの間でシームレスなトランジションが提供されます。
latest-offset: ジョブが初めて開始されると、既存データはスキャンされません。WAL の末尾、つまり最新のログ位置から読み取りを開始します。コネクタの起動後に発生したデータ変更のみをキャプチャします。
snapshot: すべての既存データをスキャンし、完全スキャン中に生成された新しい WAL エントリを読み取ります。完全スキャンが完了するとジョブは停止します。
changelog-mode
いいえ
STRING
ストリーム変更をエンコーディングするための変更ログモード。有効な値:
ALL (デフォルト):
INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTERを含むすべての操作タイプをサポートします。UPSERT:
UPSERT操作のみをサポートします。これにはINSERT、DELETE、UPDATE_AFTERが含まれます。
heartbeat.interval.ms
いいえ
DURATION
ハートビートパケットを送信する間隔。デフォルト値は 30 秒です。単位はミリ秒です。
AnalyticDB for PostgreSQL CDC コネクタは、スロットオフセットが継続的に進むことを保証するために、データベースにハートビートパケットを送信します。テーブルデータが頻繁に変更されない場合は、このパラメーターを適切な値に設定して、WAL ログを迅速にクリーンアップし、ディスク領域の浪費を回避します。
scan.incremental.snapshot.chunk.key-column
いいえ
STRING
スナップショットフェーズ中にチャンク化に使用する列を指定します。デフォルトでは、プライマリキーの最初の列が選択されます。
url
はい
STRING
フォーマットは
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>です。SQL エディターページの右上隅にある [検証] をクリックして、構文チェックを実行します。
[デプロイ] をクリックし、次に [OK] をクリックします。
右上隅にある [O&M に移動] をクリックします。[ジョブ O&M] ページで、[開始] をクリックします。
ステップ 4: Flink によって書き込まれたデータを表示する
テストデータベースで次の文を実行して、Flink によって書き込まれたデータを表示します。
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;ソーステーブルにさらに 50 行のデータを挿入します。次に、Flink が結果テーブルに書き込む増分データ行の総数を確認します。
-- ソーステーブルに 50 行の増分データを挿入します。 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- 宛先テーブルの新しいデータを確認します。 SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;結果は以下のとおりです。
count ------- 50 (1 row)
使用上の注意
ディスク領域の浪費を避けるために、レプリケーションスロットを迅速に管理してください。
Flink ジョブの再起動中にチェックポイントに対応する WAL ログがクリーンアップされることによるデータ損失を防ぐため、Flink はレプリケーションスロットを自動的に削除しません。したがって、Flink ジョブを再起動する必要がなくなったことを確認した場合は、対応するレプリケーションスロットを手動で削除して、占有しているリソースを解放する必要があります。さらに、レプリケーションスロットの確認済み位置が長期間進まない場合、AnalyticDB for PostgreSQL はその位置以降の WAL エントリをクリーンアップできません。これにより、未使用の WAL データが蓄積され、大量のディスク領域が消費される可能性があります。
AnalyticDB for PostgreSQL インスタンスの通常操作中は、exactly-once のデータ処理セマンティクスが保証されます。ただし、障害シナリオでは、at-least-once セマンティクスのみがサポートされます。
CDC コネクタは、データ同期の一貫性を確保するために、サブスクライブされたテーブルの REPLICA IDENTITY パラメーターを
FULLに変更します。この変更には次の効果があります。ディスク領域使用量の増加。更新または削除操作が頻繁に行われるシナリオでは、この設定により WAL ログのサイズが増加し、結果としてディスク領域の使用量が増加します。
書き込みパフォーマンスの低下。高同時実行書き込みのシナリオでは、パフォーマンスが大幅に影響を受ける可能性があります。
チェックポイントの負荷の増加。WAL ログが大きくなると、チェックポイントがより多くのデータを処理する必要があることを意味し、チェックポイントに必要な時間が長くなる可能性があります。
ベストプラクティス
Flink CDC は、Flink SQL API または DataStream API を使用したジョブ開発をサポートしています。Flink CDC を使用して、ソースデータベース内の単一または複数のテーブルの完全データ同期と増分データ同期を統合して実装できます。また、異種データソースでのテーブル結合などの計算も実行できます。Flink フレームワークは、データ処理手順全体で exactly-once のイベント処理セマンティクスを保証します。ただし、Flink CDC は PostgreSQL 互換データベース全体を同期するには適していません。これは、DDL 同期をサポートしておらず、Flink SQL で各テーブルの構造を定義する必要があり、メンテナンスが複雑になるためです。
このセクションでは、AnalyticDB for PostgreSQL から Kafka にデータを同期する例を使用して、Flink CDC SQL ジョブ開発のベストプラクティスについて説明します。Flink CDC ジョブを開発する前に、「前提条件」セクションで説明されているようにリソースを準備および構成していることを確認してください。
ステップ 1: テストテーブルを準備する
AnalyticDB for PostgreSQL インスタンスに 2 つのソーステーブルを作成します。
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);ステップ 2: Kafka リソースを準備する
Flink ワークスペースの CIDR ブロックを Kafka インスタンスのホワイトリストに追加します。
ステップ 3: Flink ジョブを作成する
Realtime Compute for Apache Flink コンソール にログインします。Fully Managed Flink タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
トップメニューバーで [新規] をクリックします。[空白のストリームドラフト] を選択し、[次へ] をクリックします。[新しいファイルドラフト] ダイアログボックスで、ジョブパラメーターを設定します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は現在のプロジェクト内で一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックしてサブフォルダを作成することもできます。ドラフト
エンジンバージョン
ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-6.0.7-flink-1.15
[作成] をクリックします。
ステップ 4: ジョブコードを記述してジョブをデプロイする
Flink ワークスペースで SQL ジョブを記述します。次のジョブコードをエディターにコピーし、構成を実際の値に置き換えます。
-- 1 つのソースを使用して複数のテーブルからデータをキャプチャします CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- STATEMENT SET を使用して複数の文をラップします BEGIN STATEMENT SET; -- table_name METADATA を使用してデータを宛先テーブルにルーティングします INSERT INTO KafkaProducts SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags FROM ADBPGSource WHERE table_name = 'documents'; END;この SQL ジョブに関する次の点に注意してください。
複数テーブルの同期タスクでは、この SQL の例に示すように、1 つのソーステーブルを使用して複数のテーブルからデータをキャプチャすることをお勧めします。このソーステーブルには、すべてのソーステーブルのすべての列を定義する必要があります。列名が重複している場合は、1 つだけ保持します。宛先テーブルに書き込むときは、
METADATAtable_name を使用して、指定したテーブルにデータをルーティングします。このアプローチでは、AnalyticDB for PostgreSQL にレプリケーションスロットを 1 つだけ作成する必要があります。これにより、ソースデータベースのリソース使用量が削減され、同期パフォーマンスが向上し、将来のメンテナンスが簡素化されます。table-nameパラメーターを使用して、複数のソーステーブルを指定します。テーブル名を括弧で囲み、縦棒 (|) で区切ります。例:(table1|table2|table3)。debezium.snapshot.modeをneverに設定すると、ソーステーブルからの増分データのみが同期されることを意味します。完全データと増分データの両方を同期するには、設定をinitialに変更します。
SQL エディターページの右上隅にある [検証] をクリックして、構文チェックを実行します。
[デプロイ] をクリックし、次に [OK] をクリックします。
右上隅にある [O&M に移動] をクリックします。[ジョブ O&M] ページで、[開始] をクリックします。
ステップ 5: テストデータを挿入する
AnalyticDB for PostgreSQL インスタンスで、2 つのソーステーブルのデータを更新し、Kafka Topic の メッセージの変更を監視 します。
次の SQL 文を使用してテストデータを挿入できます。
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);リファレンス
AnalyticDB for PostgreSQL の完全データをサブスクライブする方法の詳細については、「Flink を使用して完全データをリアルタイムで読み書きする」をご参照ください。