本トピックでは、AnalyticDB for PostgreSQL コネクタを用いて、Flink ジョブから AnalyticDB for PostgreSQL インスタンスへストリーミングデータを送信する方法について説明します。
制限事項
Realtime Compute for Apache Flink は、サーバーレスモードの AnalyticDB for PostgreSQL からのデータ読み取りをサポートしていません。
AnalyticDB for PostgreSQL コネクタは、Ververica Runtime (VVR) 6.0.0 以降でのみサポートされます。
AnalyticDB for PostgreSQL V7.0 は、VVR 8.0.1 以降でのみサポートされます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Flink 完全管理ワークスペースが作成済みであること。詳細については、「Flink 完全管理の有効化」をご参照ください。
AnalyticDB for PostgreSQL インスタンスが作成済みであること。詳細については、「インスタンスの作成」をご参照ください。
AnalyticDB for PostgreSQL インスタンスと Flink 完全管理ワークスペースが、同一の仮想プライベートクラウド (VPC) 内に配置されていること。
AnalyticDB for PostgreSQL インスタンスの設定
AnalyticDB for PostgreSQL コンソールにログインします。
Flink 完全管理ワークスペースの CIDR ブロックを、インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。
データベースへのログイン をクリックします。接続オプションについては、「クライアント接続」をご参照ください。
送信先テーブルを作成します。
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
Flink コネクタのセットアップ
AnalyticDB for PostgreSQL コネクタは、GitHub でホストされる JAR ファイルとして配布されています。このファイルをダウンロードし、Flink ワークスペースにカスタムコネクタとしてアップロードしてください。
Realtime Compute for Apache Flink コンソールにログインします。Flink 完全管理 タブで、対象のワークスペースを見つけ、コンソール を 操作 列からクリックします。
左側ナビゲーションウィンドウで、コネクタ をクリックします。
カスタムコネクタの作成 をクリックし、JAR ファイルをアップロードします。
説明 JAR ファイルは、「GitHub リリース」から取得してください。JAR のバージョンは、ワークスペースの Flink エンジンバージョンと一致させる必要があります。次へ をクリックします。システムが JAR の内容を解析します。解析に失敗した場合は、コネクタコードが Apache Flink コミュニティの標準に準拠しているか確認してください。
完了 をクリックします。コネクタがコネクタ一覧に表示されます。
Flink ジョブの作成
Realtime Compute for Apache Flink コンソールにログインします。Flink 完全管理 タブで、対象のワークスペースを見つけ、コンソール を 操作 列からクリックします。
左側ナビゲーションウィンドウで、SQL エディタ をクリックします。左上隅の 新規 をクリックします。
新規ドラフト ダイアログボックスで、SQL スクリプト タブを選択し、空白ストリームドラフト をクリックして 次へ をクリックします。
ドラフトを設定し、作成 をクリックします。
パラメーター 説明 例 名前 現在のプロジェクト内で一意となるドラフト名 adbpg-test 場所 コードファイルの保存先フォルダ Draft エンジンバージョン Flink エンジンバージョン。サポートされるバージョンについては、「エンジンバージョン」をご参照ください。 vvr-6.0.7-flink-1.15
AnalyticDB for PostgreSQL へのデータ書き込み
ソースおよび結果テーブルの定義
以下の SQL をコードエディタにコピーしてください。この SQL では、組み込みの datagen コネクタを用いてランダムデータを生成する datagen_source テーブルと、ご利用の AnalyticDB for PostgreSQL インスタンスに対応する test_adbpg_table 結果テーブルが定義されます。
-- ソース:組み込み datagen コネクタを用いたランダムデータ生成
CREATE TABLE datagen_source (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.f_sequence.kind' = 'sequence',
'fields.f_sequence.start' = '1',
'fields.f_sequence.end' = '1000',
'fields.f_random.min' = '1',
'fields.f_random.max' = '1000',
'fields.f_random_str.length' = '10'
);
-- 結果:adbpg コネクタを経由した AnalyticDB for PostgreSQL へのデータ書き込み
CREATE TABLE test_adbpg_table (
`B1` bigint,
`B2` bigint,
`B3` VARCHAR,
`B4` VARCHAR,
PRIMARY KEY(B1) NOT ENFORCED
) WITH (
'connector' = 'adbpg-nightly-1.13',
'url' = 'jdbc:postgresql://<internal-endpoint>:5432/<database-name>',
'tablename' = 'test_adbpg_table',
'username' = '<username>',
'password' = '<password>',
'maxretrytimes' = '2',
'batchsize' = '50000',
'connectionmaxactive' = '5',
'conflictmode' = 'ignore',
'usecopy' = '0',
'targetschema' = 'public',
'exceptionmode' = 'ignore',
'casesensitive' = '0',
'writemode' = '1',
'retrywaittime' = '200'
);datagen_source のパラメーターは変更せずにそのまま使用してください。test_adbpg_table のプレースホルダー値は実際の値に置き換え、必要に応じて任意のパラメーターを調整してください。
コネクタパラメーター
必須パラメーター
| パラメーター | 説明 | 例 |
|---|---|---|
connector | コネクタ名。形式:adbpg-nightly-{version} | adbpg-nightly-1.13 |
url | Java Database Connectivity (JDBC) URL。形式:jdbc:postgresql://<internal-endpoint>:<port>/<database> | jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres |
tablename | 送信先テーブル名 | test_adbpg_table |
username | データベースアカウント名 | — |
password | データベースアカウントパスワード | — |
任意パラメーター
| パラメーター | デフォルト値 | 説明 |
|---|---|---|
conflictmode | upsert | プライマリキーまたは一意なインデックスとの競合発生時のポリシー。詳細については、「競合処理モード」をご参照ください。 |
writemode | 1 | 書き込み方式:0 = BATCH INSERT、1 = COPY API(最高速)、2 = BATCH UPSERT |
batchsize | 50000 | 1 回の書き込みバッチあたりの最大行数 |
batchwritetimeoutms | 50000 | バッチ書き込みタイムアウト(ミリ秒単位)。この期間が経過すると、バッチが満杯でなくてもフラッシュされます。 |
maxretrytimes | 3 | 書き込み失敗時の最大リトライ回数 |
retrywaittime | 100 | リトライ間隔(ミリ秒単位) |
exceptionmode | ignore | 例外処理ポリシー:ignore = 例外発生時の書き込みデータをスキップ、strict = フェールオーバーをトリガーしエラーを報告 |
targetschema | public | AnalyticDB for PostgreSQL データベース内のターゲットスキーマ |
connectionmaxactive | 5 | 接続プール内における TaskManager ごとの最大アクティブ接続数 |
casesensitive | 0 | カラムおよびテーブル名の大文字小文字の区別:0 = 区別しない、1 = 区別する |
verbose | 0 | コネクタログ出力:0 = 無効、1 = 有効 |
競合処理モード
conflictmode パラメーターは、レコードが既存のプライマリキーまたは一意なインデックスと競合した場合の動作を制御します。AnalyticDB for PostgreSQL では、INSERT ON CONFLICT 文と COPY ON CONFLICT 文を組み合わせて UPSERT 操作を実行します。
| 値 | 動作 |
|---|---|
upsert(デフォルト) | INSERT ON CONFLICT および COPY ON CONFLICT を実行し、既存の行を上書きします。パーティションテーブルの場合、インスタンスのマイナーバージョンは V6.3.6.1 以降である必要があります。詳細については、「マイナーエンジンバージョンの更新」をご参照ください。 |
update | 競合する行を更新します |
ignore | 既存の行を保持し、受信したレコードを破棄します |
strict | フェールオーバーをトリガーしエラーを報告します |
デプロイメントの開始
SQL エディタの右上隅で、デプロイ をクリックし、続いて OK をクリックします。
説明 セッションクラスターは開発およびテスト専用です。本番環境へのデプロイメントには、セッションクラスターへの公開は行わないでください。「デプロイメントのデバッグ」をご参照ください。デプロイメント ページで、対象のデプロイメントを見つけ、開始 を 操作 列からクリックします。
ジョブの開始 ダイアログボックスで、開始 をクリックします。
結果の検証
AnalyticDB for PostgreSQL データベースに接続します。詳細については、「クライアント接続」をご参照ください。
テーブルをクエリして、データが書き込まれていることを確認します。
SELECT * FROM test_adbpg_table;データは AnalyticDB for PostgreSQL データベースに書き込まれます。次の図に返された結果を示します。

次のステップ
AnalyticDB for PostgreSQL コネクタ — コネクタの完全なリファレンス(すべてのパラメーターおよびデータ型マッピングを含む)
Datagen コネクタ — この例で使用される datagen ソースのリファレンス