このトピックでは、AnalyticDB for PostgreSQL コネクタの使用方法について説明します。
背景情報
AnalyticDB for PostgreSQL は、超並列処理(MPP)用のデータウェアハウスです。大量のデータに対してオンライン分析サービスを提供します。
次の表に、AnalyticDB for PostgreSQL コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | ソース (ベータ) 、ディメンション、および結果テーブル 説明 現在、AnalyticDB for PostgreSQL ソースから読み取るには、カスタムコネクタを構成する必要があります。詳細については、「Flink CDC を使用して完全データと増分データをリアルタイムでサブスクライブする」をご参照ください。 |
実行モード | ストリーミングモードとバッチモード。 |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | SQL |
シンクテーブルのデータの更新または削除 | サポートされています |
前提条件
AnalyticDB for PostgreSQL インスタンスと AnalyticDB for PostgreSQL テーブルが作成されていること。詳細については、「インスタンスの作成」および「CREATE TABLE」をご参照ください。
AnalyticDB for PostgreSQL インスタンスに IP アドレスのホワイトリストが構成されていること。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
制限事項
AnalyticDB for PostgreSQL V7.0 をサポートしているのは、VVR 8.0.1 以降のみです。
セルフマネージド PostgreSQL データベースはサポートされていません。
構文
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);コネクタオプション
一般
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | 使用するコネクタ。 | STRING | はい | デフォルト値なし |
|
url | データベースの Java Database Connectivity (JDBC) URL。 | STRING | はい | デフォルト値なし | URL は |
tableName | データベース内のテーブルの名前。 | STRING | はい | デフォルト値なし | 該当なし。 |
userName | AnalyticDB for PostgreSQL データベースへのアクセスに使用するユーザー名。 | STRING | はい | デフォルト値なし | 該当なし。 |
password | AnalyticDB for PostgreSQL データベースへのアクセスに使用するパスワード。 | STRING | はい | デフォルト値なし | 該当なし。 |
maxRetryTimes | データ書き込み試行が失敗した場合に、テーブルへのデータ書き込みを許可される最大再試行回数。 | INTEGER | いいえ | 3 | 該当なし。 |
targetSchema | スキーマの名前。 | STRING | いいえ | public | 該当なし。 |
caseSensitive | 大文字と小文字を区別するかどうかを指定します。 | STRING | いいえ | false | 有効な値:
|
connectionMaxActive | 接続プール内の最大接続数。 | INTEGER | いいえ | 5 | システムは、アイドル状態の接続をデータベースサービスに自動的に解放します。 重要 このオプションに過度に大きな値を設定すると、サーバー接続数が異常になる可能性があります。 |
ソース固有 (ベータ)
オプション | 説明 | データ型 | 必須 | 備考 |
schema-name | スキーマ名。 | STRING | はい | このオプションは正規表現をサポートしています。一度に複数のスキーマをサブスクライブできます。 |
port | AnalyticDB for PostgreSQL インスタンスのポート。 | INTEGER | はい |
|
decoding.plugin.name | PostgreSQL 論理デコーディングプラグインの名前。 | STRING | はい |
|
slot.name | 論理デコーディングスロットの名前。 | STRING | はい |
|
debezium.* | Debezium クライアントの動作を制御します。 | STRING | はい | たとえば、 |
scan.incremental.snapshot.enabled | 増分スナップショットを有効にするかどうかを指定します。 | BOOLEAN | いいえ | 有効な値:
|
scan.startup.mode | データ消費の起動モード。 | STRING | いいえ | 有効な値:
|
changelog-mode | 変更ストリーム内で変更イベントがどのようにエンコードされるかを指定します。 | STRING | いいえ | 有効な値:
|
heartbeat.interval.ms | ハートビートパケットを送信する間隔 (ミリ秒) 。 | DURATION | いいえ | デフォルト値: 30 秒。 AnalyticDB for PostgreSQL CDC コネクタは、スロットオフセットが継続的に進むことを保証するために、データベースにハートビートパケットを積極的に送信します。テーブルデータが頻繁に変更されない場合は、このオプションを適切な値に設定して、WAL ログを定期的にクリアし、ディスクの浪費を避けてください。 |
scan.incremental.snapshot.chunk.key-column | スナップショット読み取り中にチャンクキー列を指定します。 | STRING | いいえ | デフォルトでは、プライマリキーの最初の列になります。 |
シンク固有
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
retryWaitTime | リトライ間の間隔 (ミリ秒) 。 | INTEGER | いいえ | 100 | |
batchSize | 一度にテーブルに書き込むことができるデータレコードの数。 | INTEGER | いいえ | 500 | N/A。 |
flushIntervalMs | キャッシュがクリアされる間隔。 | INTEGER | いいえ | N/A。 | 指定された期間内にキャッシュされたデータレコードの数が上限に達しない場合、すべてのキャッシュされたデータが結果テーブルに書き込まれます。単位: ミリ秒。 |
writeMode | システムが初めてテーブルにデータを書き込もうとするときの書き込みモード。 | STRING | いいえ | insert | 有効な値:
|
conflictMode | データがテーブルに挿入されるときに、プライマリキーの競合またはインデックスの競合を処理するポリシー。 | STRING | いいえ | strict | 有効な値:
|
ディメンションテーブル固有
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
maxJoinRows | 1 行のデータで結合する最大行数。 | INTEGER | いいえ | 1024 | N/A。 |
cache | キャッシュポリシー。 | STRING | いいえ | ALL | 有効な値:
|
cacheSize | キャッシュできるデータの最大行数。 | LONG | いいえ | 100000 | cacheSize オプションは、cache オプションを LRU に設定した場合にのみ有効になります。 |
cacheTTLMs | キャッシュのタイムアウト期間。 | LONG | いいえ | Long.MAX_VALUE | cacheTTLMs オプションの構成は、cache オプションによって異なります。
単位: ミリ秒。 |
データ型マッピング
AnalyticDB for PostgreSQL のデータ型 | Realtime Compute for Apache Flink のデータ型 |
BOOLEAN | BOOLEAN |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
FLOAT | DOUBLE |
VARCHAR | VARCHAR |
TEXT | VARCHAR |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
サンプルコード
ソーステーブル (ベータ)
「Flink CDC を使用して完全データと増分データをリアルタイムでサブスクライブする」をご参照ください。
結果テーブル:
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen ソーステーブル' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;ディメンションテーブル:
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen ソーステーブル' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole シンクテーブル' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;