すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Flink CDC を使用して完全データと増分データをリアルタイムでサブスクライブする (招待プレビュー)

最終更新日:Nov 09, 2025

AnalyticDB for PostgreSQL は、PostgreSQL の論理レプリケーション機能に基づいて完全データと増分データをサブスクライブする、自社開発の Change Data Capture (CDC) コネクタを提供します。このコネクタは Flink とシームレスに統合されます。ソーステーブルからのリアルタイムのデータ変更を効率的にキャプチャし、リアルタイムのデータ同期とストリーム処理を実現します。これにより、企業は動的なデータ要件に迅速に対応できます。このトピックでは、Realtime Compute for Apache Flink CDC を使用して AnalyticDB for PostgreSQL から完全データと増分データをリアルタイムでサブスクライブする方法について説明します。

制限事項

  • この機能は、マイナーエンジンバージョン 7.2.1.4 以降を実行する AnalyticDB for PostgreSQL V7.0 インスタンスでのみ利用可能です。

    説明

    AnalyticDB for PostgreSQL コンソールのインスタンスの[基本情報]ページでマイナーバージョンを確認できます。インスタンスが必要なバージョンを満たしていない場合は、インスタンスのマイナーバージョンを更新してください

  • AnalyticDB for PostgreSQL のサーバーレスモードはサポートされていません。

前提条件

  • AnalyticDB for PostgreSQL インスタンスと フルマネージド Flink ワークスペース は、同じ VPC にある必要があります。

  • AnalyticDB for PostgreSQL インスタンスの パラメーター設定 を調整する必要があります。

    • wal_level パラメーターを logical に設定して、論理レプリケーションを有効にします。

    • AnalyticDB for PostgreSQLHigh-availability Edition インスタンスを使用する場合、hot_standbyhot_standby_feedback、および sync_replication_slots パラメーターを on に設定する必要があります。これにより、プライマリ/セカンダリのフェールオーバーによって論理サブスクリプションが中断されないようになります。

  • AnalyticDB for PostgreSQL インスタンスには、初期アカウント または RDS_SUPERUSER 権限を持つ特権ユーザー を使用する必要があります。ユーザーには REPLICATION 権限が付与されている必要があります。ALTER USER <username> WITH REPLICATION;

  • Flink ワークスペースの CIDR ブロックを AnalyticDB for PostgreSQL インスタンスの ホワイトリスト に追加する必要があります。

  • flink-sql-connector-adbpg-cdc-3.3.jar をダウンロードし、CDC コネクタを Flink ワークスペースにアップロード する必要があります。

手順

ステップ 1: テストテーブルとテストデータを準備する

  1. AnalyticDB for PostgreSQL コンソール にログインします。管理するインスタンスを見つけて、インスタンス ID をクリックします。

  2. [基本情報] ページの右下隅にある [データベースにログオン] をクリックします。

  3. テストデータベースと adbpg_source_table という名前のソーステーブルを作成します。次に、ソーステーブルに 50 行のデータを挿入します。

    -- テストデータベースを作成します。
    CREATE DATABASE testdb;
    -- testdb データベースに切り替えて、スキーマを作成します。
    CREATE SCHEMA testschema;
    -- adbpg_source_table という名前のソーステーブルを作成します。
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- adbpg_source_table テーブルに 50 行のデータを挿入します。
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. Flink が結果データを書き込むための adbpg_sink_table という名前の結果テーブルを作成します。

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

ステップ 2: Flink ジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログインします。Fully Managed Flink タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[データ開発] > [ETL] を選択します。

  3. トップメニューバーで [新規] をクリックします。[空白のストリームドラフト] を選択し、[次へ] をクリックします。[新しいファイルドラフト] ダイアログボックスで、ジョブパラメーターを設定します。

    パラメーター

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は現在のプロジェクト内で一意である必要があります。

    adbpg-test

    場所

    ドラフトのコードファイルが保存されるフォルダ。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックしてサブフォルダを作成することもできます。

    ドラフト

    エンジンバージョン

    ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

    vvr-6.0.7-flink-1.15

  4. [作成] をクリックします。

ステップ 3: ジョブコードを記述してジョブをデプロイする

  1. アナログデータを生成するための datagen_source という名前のソースと、AnalyticDB for PostgreSQL データベースからリアルタイムのデータ変更をキャプチャするための source_adbpg という名前のソースを作成します。次に、2 つのソースを結合し、結果を sink_adbpg という名前の結果テーブルに書き込みます。処理されたデータは AnalyticDB for PostgreSQL に書き込まれます。

    次のジョブコードをエディターにコピーします。

    ---Datagen コネクタを使用してストリーミングデータを生成する Datagen ソーステーブルを作成します。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --adbpg-cdc コネクタを使用して、slot.name と pgoutput に基づいて adbpg_source_table テーブルのデータ変更をキャプチャする adbpg ソーステーブルを作成します。
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc', 
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    --処理された結果をデータベースの宛先テーブル adbpg_sink_table に書き込むための adbpg 結果テーブルを作成します。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',  
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- datagen_source テーブルと source_adbpg テーブルの結合結果を adbpg 結果テーブルに書き込みます。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    パラメーター

    パラメーター

    必須

    データ型

    説明

    connector

    はい

    STRING

    コネクタのタイプ。ソーステーブルには値を adbpg-cdc に、結果テーブルには adbpg に設定します。

    hostname

    はい

    STRING

    AnalyticDB for PostgreSQL インスタンスの内部エンドポイント。インスタンスの [基本情報] ページで 内部エンドポイント を取得できます。

    username

    はい

    STRING

    AnalyticDB for PostgreSQL インスタンスのデータベースアカウントとパスワード。

    password

    はい

    STRING

    database-name

    はい

    STRING

    データベースの名前。

    schema-name

    はい

    STRING

    スキーマの名前。このパラメーターは正規表現をサポートしています。一度に複数のスキーマをサブスクライブできます。

    table-name

    はい

    STRING

    テーブルの名前。このパラメーターは正規表現をサポートしています。一度に複数のテーブルをサブスクライブできます。

    port

    はい

    INTEGER

    AnalyticDB for PostgreSQL のポート。値は 5432 に固定されています。

    decoding.plugin.name

    はい

    STRING

    PostgreSQL 論理デコーディングプラグインの名前。値は pgoutput に固定されています。

    slot.name

    はい

    STRING

    論理デコーディングスロットの名前。

    • 同じ Flink ジョブ内のソーステーブルには、slot.name に同じ値を使用します。

    • 異なる Flink ジョブが同じテーブルに関係する場合は、各ジョブに一意の slot.name を設定します。これにより、次のエラーが防止されます: PSQLException: ERROR: replication slot "debezium" is active for PID 974

    debezium.*

    いいえ

    STRING

    Debezium クライアントの動作をより細かい粒度で制御します。たとえば、'debezium.snapshot.mode' = 'never' と設定すると、スナップショット機能が無効になります。詳細については、「構成プロパティ」をご参照ください。

    scan.incremental.snapshot.enabled

    いいえ

    BOOLEAN

    増分スナップショットを有効にするかどうかを指定します。有効な値:

    • false (デフォルト): 増分スナップショットは無効になります。

    • true: 増分スナップショットは有効になります。

    scan.startup.mode

    いいえ

    STRING

    データ消費の起動モード。有効な値:

    • initial (デフォルト): ジョブが初めて開始されると、すべての既存データをスキャンし、最新の先行書き込みログ (WAL) データを読み取ります。これにより、完全データと増分データの間でシームレスなトランジションが提供されます。

    • latest-offset: ジョブが初めて開始されると、既存データはスキャンされません。WAL の末尾、つまり最新のログ位置から読み取りを開始します。コネクタの起動後に発生したデータ変更のみをキャプチャします。

    • snapshot: すべての既存データをスキャンし、完全スキャン中に生成された新しい WAL エントリを読み取ります。完全スキャンが完了するとジョブは停止します。

    changelog-mode

    いいえ

    STRING

    ストリーム変更をエンコーディングするための変更ログモード。有効な値:

    • ALL (デフォルト): INSERTDELETEUPDATE_BEFOREUPDATE_AFTER を含むすべての操作タイプをサポートします。

    • UPSERT: UPSERT 操作のみをサポートします。これには INSERTDELETEUPDATE_AFTER が含まれます。

    heartbeat.interval.ms

    いいえ

    DURATION

    ハートビートパケットを送信する間隔。デフォルト値は 30 秒です。単位はミリ秒です。

    AnalyticDB for PostgreSQL CDC コネクタは、スロットオフセットが継続的に進むことを保証するために、データベースにハートビートパケットを送信します。テーブルデータが頻繁に変更されない場合は、このパラメーターを適切な値に設定して、WAL ログを迅速にクリーンアップし、ディスク領域の浪費を回避します。

    scan.incremental.snapshot.chunk.key-column

    いいえ

    STRING

    スナップショットフェーズ中にチャンク化に使用する列を指定します。デフォルトでは、プライマリキーの最初の列が選択されます。

    url

    はい

    STRING

    フォーマットは jdbc:postgresql://<Address>:<PortId>/<DatabaseName> です。

  2. SQL エディターページの右上隅にある [検証] をクリックして、構文チェックを実行します。

  3. [デプロイ] をクリックし、次に [OK] をクリックします。

  4. 右上隅にある [O&M に移動] をクリックします。[ジョブ O&M] ページで、[開始] をクリックします。

ステップ 4: Flink によって書き込まれたデータを表示する

  1. テストデータベースで次の文を実行して、Flink によって書き込まれたデータを表示します。

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table; 
  2. ソーステーブルにさらに 50 行のデータを挿入します。次に、Flink が結果テーブルに書き込む増分データ行の総数を確認します。

    -- ソーステーブルに 50 行の増分データを挿入します。
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- 宛先テーブルの新しいデータを確認します。
    SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;

    結果は以下のとおりです。

     count 
    -------
        50
    (1 row)

使用上の注意

  • ディスク領域の浪費を避けるために、レプリケーションスロットを迅速に管理してください。

    Flink ジョブの再起動中にチェックポイントに対応する WAL ログがクリーンアップされることによるデータ損失を防ぐため、Flink はレプリケーションスロットを自動的に削除しません。したがって、Flink ジョブを再起動する必要がなくなったことを確認した場合は、対応するレプリケーションスロットを手動で削除して、占有しているリソースを解放する必要があります。さらに、レプリケーションスロットの確認済み位置が長期間進まない場合、AnalyticDB for PostgreSQL はその位置以降の WAL エントリをクリーンアップできません。これにより、未使用の WAL データが蓄積され、大量のディスク領域が消費される可能性があります。

  • AnalyticDB for PostgreSQL インスタンスの通常操作中は、exactly-once のデータ処理セマンティクスが保証されます。ただし、障害シナリオでは、at-least-once セマンティクスのみがサポートされます。

  • CDC コネクタは、データ同期の一貫性を確保するために、サブスクライブされたテーブルの REPLICA IDENTITY パラメーターを FULL に変更します。この変更には次の効果があります。

    • ディスク領域使用量の増加。更新または削除操作が頻繁に行われるシナリオでは、この設定により WAL ログのサイズが増加し、結果としてディスク領域の使用量が増加します。

    • 書き込みパフォーマンスの低下。高同時実行書き込みのシナリオでは、パフォーマンスが大幅に影響を受ける可能性があります。

    • チェックポイントの負荷の増加。WAL ログが大きくなると、チェックポイントがより多くのデータを処理する必要があることを意味し、チェックポイントに必要な時間が長くなる可能性があります。

ベストプラクティス

Flink CDC は、Flink SQL API または DataStream API を使用したジョブ開発をサポートしています。Flink CDC を使用して、ソースデータベース内の単一または複数のテーブルの完全データ同期と増分データ同期を統合して実装できます。また、異種データソースでのテーブル結合などの計算も実行できます。Flink フレームワークは、データ処理手順全体で exactly-once のイベント処理セマンティクスを保証します。ただし、Flink CDC は PostgreSQL 互換データベース全体を同期するには適していません。これは、DDL 同期をサポートしておらず、Flink SQL で各テーブルの構造を定義する必要があり、メンテナンスが複雑になるためです。

このセクションでは、AnalyticDB for PostgreSQL から Kafka にデータを同期する例を使用して、Flink CDC SQL ジョブ開発のベストプラクティスについて説明します。Flink CDC ジョブを開発する前に、「前提条件」セクションで説明されているようにリソースを準備および構成していることを確認してください。

ステップ 1: テストテーブルを準備する

AnalyticDB for PostgreSQL インスタンスに 2 つのソーステーブルを作成します。

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    sku CHAR(12) NOT NULL,
    description TEXT,
    price NUMERIC(10,2) NOT NULL,
    discount_price DECIMAL(10,2),
    stock_quantity INTEGER DEFAULT 0,
    weight REAL,
    volume DOUBLE PRECISION,
    dimensions BOX,
    release_date DATE,
    is_featured BOOLEAN DEFAULT FALSE,
    rating FLOAT,
    warranty_period INTERVAL,
    metadata JSON,
    tags TEXT[]
);

CREATE TABLE documents (
    document_id UUID PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    content TEXT,
    summary TEXT,
    publication_date TIMESTAMP WITHOUT TIME ZONE,
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    author_id BIGINT,
    file_data BYTEA,
    xml_content XML,
    json_metadata JSON,
    reading_time INTERVAL,
    is_public BOOLEAN DEFAULT TRUE,
    views_count INTEGER DEFAULT 0,
    category VARCHAR(50),
    tags TEXT[]
);

ステップ 2: Kafka リソースを準備する

  1. Kafka インスタンスを購入してデプロイする

  2. Flink ワークスペースの CIDR ブロックを Kafka インスタンスのホワイトリストに追加します。

  3. Kafka インスタンスにリソースを作成する

ステップ 3: Flink ジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログインします。Fully Managed Flink タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[データ開発] > [ETL] を選択します。

  3. トップメニューバーで [新規] をクリックします。[空白のストリームドラフト] を選択し、[次へ] をクリックします。[新しいファイルドラフト] ダイアログボックスで、ジョブパラメーターを設定します。

    パラメーター

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は現在のプロジェクト内で一意である必要があります。

    adbpg-test

    場所

    ドラフトのコードファイルが保存されるフォルダ。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックしてサブフォルダを作成することもできます。

    ドラフト

    エンジンバージョン

    ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

    vvr-6.0.7-flink-1.15

  4. [作成] をクリックします。

ステップ 4: ジョブコードを記述してジョブをデプロイする

  1. Flink ワークスペースで SQL ジョブを記述します。次のジョブコードをエディターにコピーし、構成を実際の値に置き換えます。

    -- 1 つのソースを使用して複数のテーブルからデータをキャプチャします
    CREATE TEMPORARY TABLE ADBPGSource(
        table_name STRING METADATA FROM 'table_name' VIRTUAL,
        row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING
    ) WITH (
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'public',
      'table-name' = '(products|documents)',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput',
      'debezium.snapshot.mode' = 'never'
    );
    
    CREATE TEMPORARY TABLE KafkaProducts (
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        PRIMARY KEY(product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    CREATE TEMPORARY TABLE KafkaDocuments (
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING,
        tags STRING,
        PRIMARY KEY(document_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    -- STATEMENT SET を使用して複数の文をラップします
    BEGIN STATEMENT SET;
    -- table_name METADATA を使用してデータを宛先テーブルにルーティングします
    INSERT INTO KafkaProducts
    SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags
    FROM ADBPGSource
    WHERE table_name = 'products';
    
    INSERT INTO KafkaDocuments
    SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags
    FROM ADBPGSource
    WHERE table_name = 'documents';
    
    END;

    この SQL ジョブに関する次の点に注意してください。

    • 複数テーブルの同期タスクでは、この SQL の例に示すように、1 つのソーステーブルを使用して複数のテーブルからデータをキャプチャすることをお勧めします。このソーステーブルには、すべてのソーステーブルのすべての列を定義する必要があります。列名が重複している場合は、1 つだけ保持します。宛先テーブルに書き込むときは、METADATA table_name を使用して、指定したテーブルにデータをルーティングします。このアプローチでは、AnalyticDB for PostgreSQL にレプリケーションスロットを 1 つだけ作成する必要があります。これにより、ソースデータベースのリソース使用量が削減され、同期パフォーマンスが向上し、将来のメンテナンスが簡素化されます。

    • table-name パラメーターを使用して、複数のソーステーブルを指定します。テーブル名を括弧で囲み、縦棒 (|) で区切ります。例: (table1|table2|table3)

    • debezium.snapshot.modenever に設定すると、ソーステーブルからの増分データのみが同期されることを意味します。完全データと増分データの両方を同期するには、設定を initial に変更します。

  2. SQL エディターページの右上隅にある [検証] をクリックして、構文チェックを実行します。

  3. [デプロイ] をクリックし、次に [OK] をクリックします。

  4. 右上隅にある [O&M に移動] をクリックします。[ジョブ O&M] ページで、[開始] をクリックします。

ステップ 5: テストデータを挿入する

AnalyticDB for PostgreSQL インスタンスで、2 つのソーステーブルのデータを更新し、Kafka Topic の メッセージの変更を監視 します。

次の SQL 文を使用してテストデータを挿入できます。

INSERT INTO products (
    product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
    'Test Product', 'Test-2025', 'A piece of test product data', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"Test1", "Test2"}'
);

リファレンス

AnalyticDB for PostgreSQL の完全データをサブスクライブする方法の詳細については、「Flink を使用して完全データをリアルタイムで読み書きする」をご参照ください。