すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:AnalyticDB for PostgreSQL コネクタ

最終更新日:Nov 06, 2025

このトピックでは、AnalyticDB for PostgreSQL コネクタの使用方法について説明します。

背景情報

AnalyticDB for PostgreSQL は、超並列処理(MPP)用のデータウェアハウスです。大量のデータに対してオンライン分析サービスを提供します。

次の表に、AnalyticDB for PostgreSQL コネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

ソース (ベータ) 、ディメンション、および結果テーブル

説明

現在、AnalyticDB for PostgreSQL ソースから読み取るには、カスタムコネクタを構成する必要があります。詳細については、「Flink CDC を使用して完全データと増分データをリアルタイムでサブスクライブする」をご参照ください。

実行モード

ストリーミングモードとバッチモード。

データ形式

該当なし

メトリック

  • シンクテーブルのメトリック

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • ディメンションテーブルのメトリック: なし

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

SQL

シンクテーブルのデータの更新または削除

サポートされています

前提条件

  • AnalyticDB for PostgreSQL インスタンスと AnalyticDB for PostgreSQL テーブルが作成されていること。詳細については、「インスタンスの作成」および「CREATE TABLE」をご参照ください。

  • AnalyticDB for PostgreSQL インスタンスに IP アドレスのホワイトリストが構成されていること。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。

制限事項

  • AnalyticDB for PostgreSQL V7.0 をサポートしているのは、VVR 8.0.1 以降のみです。

  • セルフマネージド PostgreSQL データベースはサポートされていません。

構文

CREATE TEMPORARY TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

コネクタオプション

一般

オプション

説明

データ型

必須

デフォルト値

備考

connector

使用するコネクタ。

STRING

はい

デフォルト値なし

  • ソーステーブル: adbpg-cdc に設定します。

  • ディメンションテーブルと結果テーブル: adbpg に設定します。

url

データベースの Java Database Connectivity (JDBC) URL。

STRING

はい

デフォルト値なし

URL は jdbc:postgresql://<Address>:<PortId>/<DatabaseName> 形式です。

tableName

データベース内のテーブルの名前。

STRING

はい

デフォルト値なし

該当なし。

userName

AnalyticDB for PostgreSQL データベースへのアクセスに使用するユーザー名。

STRING

はい

デフォルト値なし

該当なし。

password

AnalyticDB for PostgreSQL データベースへのアクセスに使用するパスワード。

STRING

はい

デフォルト値なし

該当なし。

maxRetryTimes

データ書き込み試行が失敗した場合に、テーブルへのデータ書き込みを許可される最大再試行回数。

INTEGER

いいえ

3

該当なし。

targetSchema

スキーマの名前。

STRING

いいえ

public

該当なし。

caseSensitive

大文字と小文字を区別するかどうかを指定します。

STRING

いいえ

false

有効な値:

  • true: 大文字と小文字の区別が有効になります。

  • false: 大文字と小文字の区別が無効になります。これがデフォルト値です。

connectionMaxActive

接続プール内の最大接続数。

INTEGER

いいえ

5

システムは、アイドル状態の接続をデータベースサービスに自動的に解放します。

重要

このオプションに過度に大きな値を設定すると、サーバー接続数が異常になる可能性があります。

ソース固有 (ベータ)

オプション

説明

データ型

必須

備考

schema-name

スキーマ名。

STRING

はい

このオプションは正規表現をサポートしています。一度に複数のスキーマをサブスクライブできます。

port

AnalyticDB for PostgreSQL インスタンスのポート。

INTEGER

はい

5432 に設定します。

decoding.plugin.name

PostgreSQL 論理デコーディングプラグインの名前。

STRING

はい

pgoutput に設定します。

slot.name

論理デコーディングスロットの名前。

STRING

はい

  • 同じ Flink ジョブ内: すべてのソーステーブルで一貫した slot.name 値を使用します。

  • 異なる Flink ジョブ間: PSQLException: ERROR: replication slot "debezium" is active for PID 974 のようなエラーを防ぐために、各ジョブに一意の slot.name 値を割り当てます。

debezium.*

Debezium クライアントの動作を制御します。

STRING

はい

たとえば、'debezium.snapshot.mode' = 'never' を設定してスナップショットを無効にします。詳細については、「コネクタのプロパティ」をご参照ください。

scan.incremental.snapshot.enabled

増分スナップショットを有効にするかどうかを指定します。

BOOLEAN

いいえ

有効な値:

  • false (デフォルト)

  • true

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

有効な値:

  • initial (デフォルト): コネクタが初めて起動すると、完全な既存データスキャンを実行し、その後最新の先行書き込みログ (WAL) データの読み取りを開始します。

  • latest-offset: コネクタは、既存データをスキャンせずに WAL の末尾 (最新のログ位置) から読み取りを開始します。

  • snapshot: コネクタは、完全な既存データスキャンを実行し、このスキャン中に生成された新しい WAL データを読み取ります。完全データスキャンが完了すると、ジョブは停止します。

changelog-mode

変更ストリーム内で変更イベントがどのようにエンコードされるかを指定します。

STRING

いいえ

有効な値:

  • ALL (デフォルト): INSERTDELETEUPDATE_BEFORE、および UPDATE_AFTER を含む、すべての既存の変更イベントをキャプチャします。

  • UPSERT: INSERTDELETE、および UPDATE_AFTER を含む UPSERT イベントをキャプチャします。

heartbeat.interval.ms

ハートビートパケットを送信する間隔 (ミリ秒) 。

DURATION

いいえ

デフォルト値: 30 秒。

AnalyticDB for PostgreSQL CDC コネクタは、スロットオフセットが継続的に進むことを保証するために、データベースにハートビートパケットを積極的に送信します。テーブルデータが頻繁に変更されない場合は、このオプションを適切な値に設定して、WAL ログを定期的にクリアし、ディスクの浪費を避けてください。

scan.incremental.snapshot.chunk.key-column

スナップショット読み取り中にチャンクキー列を指定します。

STRING

いいえ

デフォルトでは、プライマリキーの最初の列になります。

シンク固有

オプション

説明

データ型

必須

デフォルト値

備考

retryWaitTime

リトライ間の間隔 (ミリ秒) 。

INTEGER

いいえ

100

batchSize

一度にテーブルに書き込むことができるデータレコードの数。

INTEGER

いいえ

500

N/A。

flushIntervalMs

キャッシュがクリアされる間隔。

INTEGER

いいえ

N/A。

指定された期間内にキャッシュされたデータレコードの数が上限に達しない場合、すべてのキャッシュされたデータが結果テーブルに書き込まれます。単位: ミリ秒。

writeMode

システムが初めてテーブルにデータを書き込もうとするときの書き込みモード。

STRING

いいえ

insert

有効な値:

  • insert: データは結果テーブルに直接挿入されます。競合が発生した場合、処理ポリシーは conflictMode オプションによって決定されます。これがデフォルト値です。

  • upsert: 競合が発生すると、テーブル内のデータが自動的に更新されます。この値は、プライマリキーを持つテーブルにのみ適しています。

  • copy: データは COPY コマンドを介して挿入されます。

    説明

    VVR 11.1 以降のみが copy をサポートします。

conflictMode

データがテーブルに挿入されるときに、プライマリキーの競合またはインデックスの競合を処理するポリシー。

STRING

いいえ

strict

有効な値:

  • strict: 競合が発生した場合、システムはエラーを報告します。これがデフォルト値です。

  • ignore: 競合が発生した場合、システムは競合を無視します。

  • update: 競合が発生した場合、システムはデータを自動的に更新します。この値は、プライマリキーを持たないテーブルに適しています。このポリシーは、データ処理効率を低下させます。

  • upsert: 競合が発生した場合、システムはテーブル内のデータを自動的に更新します。この値は、プライマリキーを持つテーブルにのみ適しています。

ディメンションテーブル固有

オプション

説明

データ型

必須

デフォルト値

備考

maxJoinRows

1 行のデータで結合する最大行数。

INTEGER

いいえ

1024

N/A。

cache

キャッシュポリシー。

STRING

いいえ

ALL

有効な値:

  • ALL: ディメンションテーブルのすべてのデータがキャッシュされます。これがデフォルト値です。デプロイメントが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。これにより、ディメンションテーブル内の後続のすべてのクエリでキャッシュが検索されます。システムがキャッシュ内でデータレコードを見つけられない場合、結合キーは存在しません。キャッシュエントリの有効期限が切れた後、システムはキャッシュ内のすべてのデータを再ロードします。

  • LRU: ディメンションテーブルの一部のデータがキャッシュされます。ソーステーブルからデータレコードが読み取られるたびに、システムはキャッシュ内のデータを検索します。データが見つからない場合、システムは物理ディメンションテーブルでデータを検索します。

  • None: データはキャッシュされません。

cacheSize

キャッシュできるデータの最大行数。

LONG

いいえ

100000

cacheSize オプションは、cache オプションを LRU に設定した場合にのみ有効になります。

cacheTTLMs

キャッシュのタイムアウト期間。

LONG

いいえ

Long.MAX_VALUE

cacheTTLMs オプションの構成は、cache オプションによって異なります。

  • cache オプションが LRU に設定されている場合、cacheTTLMs オプションはキャッシュのタイムアウト期間を指定します。デフォルトでは、キャッシュエントリは有効期限切れになりません。

  • cache オプションを ALL に設定した場合、cacheTTLMs オプションはシステムがキャッシュをリフレッシュする間隔を指定します。デフォルトでは、キャッシュはリフレッシュされません。

単位: ミリ秒。

データ型マッピング

AnalyticDB for PostgreSQL のデータ型

Realtime Compute for Apache Flink のデータ型

BOOLEAN

BOOLEAN

SMALLINT

INT

INT

INT

BIGINT

BIGINT

FLOAT

DOUBLE

VARCHAR

VARCHAR

TEXT

VARCHAR

TIMESTAMP

TIMESTAMP

DATE

DATE

サンプルコード

  • ソーステーブル (ベータ)

    Flink CDC を使用して完全データと増分データをリアルタイムでサブスクライブする」をご参照ください。

  • 結果テーブル:

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen ソーステーブル'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • ディメンションテーブル:

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen ソーステーブル'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole シンクテーブル'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

参照