すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Realtime Compute for Apache Flink を使用した AnalyticDB for PostgreSQL へのデータ書き込み

最終更新日:Mar 29, 2026

本トピックでは、AnalyticDB for PostgreSQL コネクタを用いて、Flink ジョブから AnalyticDB for PostgreSQL インスタンスへストリーミングデータを送信する方法について説明します。

制限事項

  • Realtime Compute for Apache Flink は、サーバーレスモードの AnalyticDB for PostgreSQL からのデータ読み取りをサポートしていません。

  • AnalyticDB for PostgreSQL コネクタは、Ververica Runtime (VVR) 6.0.0 以降でのみサポートされます。

  • AnalyticDB for PostgreSQL V7.0 は、VVR 8.0.1 以降でのみサポートされます。

説明 カスタムコネクタの管理カスタムコネクタを使用する場合は、「」の手順に従ってください。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • Flink 完全管理ワークスペースが作成済みであること。詳細については、「Flink 完全管理の有効化」をご参照ください。

  • AnalyticDB for PostgreSQL インスタンスが作成済みであること。詳細については、「インスタンスの作成」をご参照ください。

  • AnalyticDB for PostgreSQL インスタンスと Flink 完全管理ワークスペースが、同一の仮想プライベートクラウド (VPC) 内に配置されていること。

AnalyticDB for PostgreSQL インスタンスの設定

  1. AnalyticDB for PostgreSQL コンソールにログインします。

  2. Flink 完全管理ワークスペースの CIDR ブロックを、インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストの設定」をご参照ください。

  3. データベースへのログイン をクリックします。接続オプションについては、「クライアント接続」をご参照ください。

  4. 送信先テーブルを作成します。

    CREATE TABLE test_adbpg_table(
      b1 int,
      b2 int,
      b3 text,
      PRIMARY KEY(b1)
    );

Flink コネクタのセットアップ

AnalyticDB for PostgreSQL コネクタは、GitHub でホストされる JAR ファイルとして配布されています。このファイルをダウンロードし、Flink ワークスペースにカスタムコネクタとしてアップロードしてください。

  1. Realtime Compute for Apache Flink コンソールにログインします。Flink 完全管理 タブで、対象のワークスペースを見つけ、コンソール操作 列からクリックします。

  2. 左側ナビゲーションウィンドウで、コネクタ をクリックします。

  3. カスタムコネクタの作成 をクリックし、JAR ファイルをアップロードします。

    説明 JAR ファイルは、「GitHub リリース」から取得してください。JAR のバージョンは、ワークスペースの Flink エンジンバージョンと一致させる必要があります。
  4. 次へ をクリックします。システムが JAR の内容を解析します。解析に失敗した場合は、コネクタコードが Apache Flink コミュニティの標準に準拠しているか確認してください。

  5. 完了 をクリックします。コネクタがコネクタ一覧に表示されます。

Flink ジョブの作成

  1. Realtime Compute for Apache Flink コンソールにログインします。Flink 完全管理 タブで、対象のワークスペースを見つけ、コンソール操作 列からクリックします。

  2. 左側ナビゲーションウィンドウで、SQL エディタ をクリックします。左上隅の 新規 をクリックします。

  3. 新規ドラフト ダイアログボックスで、SQL スクリプト タブを選択し、空白ストリームドラフト をクリックして 次へ をクリックします。

  4. ドラフトを設定し、作成 をクリックします。

    パラメーター説明
    名前現在のプロジェクト内で一意となるドラフト名adbpg-test
    場所コードファイルの保存先フォルダDraft
    エンジンバージョンFlink エンジンバージョン。サポートされるバージョンについては、「エンジンバージョン」をご参照ください。vvr-6.0.7-flink-1.15

AnalyticDB for PostgreSQL へのデータ書き込み

ソースおよび結果テーブルの定義

以下の SQL をコードエディタにコピーしてください。この SQL では、組み込みの datagen コネクタを用いてランダムデータを生成する datagen_source テーブルと、ご利用の AnalyticDB for PostgreSQL インスタンスに対応する test_adbpg_table 結果テーブルが定義されます。

-- ソース:組み込み datagen コネクタを用いたランダムデータ生成
CREATE TABLE datagen_source (
  f_sequence INT,
  f_random INT,
  f_random_str STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.f_sequence.kind' = 'sequence',
  'fields.f_sequence.start' = '1',
  'fields.f_sequence.end' = '1000',
  'fields.f_random.min' = '1',
  'fields.f_random.max' = '1000',
  'fields.f_random_str.length' = '10'
);

-- 結果:adbpg コネクタを経由した AnalyticDB for PostgreSQL へのデータ書き込み
CREATE TABLE test_adbpg_table (
  `B1` bigint,
  `B2` bigint,
  `B3` VARCHAR,
  `B4` VARCHAR,
  PRIMARY KEY(B1) NOT ENFORCED
) WITH (
  'connector' = 'adbpg-nightly-1.13',
  'url' = 'jdbc:postgresql://<internal-endpoint>:5432/<database-name>',
  'tablename' = 'test_adbpg_table',
  'username' = '<username>',
  'password' = '<password>',
  'maxretrytimes' = '2',
  'batchsize' = '50000',
  'connectionmaxactive' = '5',
  'conflictmode' = 'ignore',
  'usecopy' = '0',
  'targetschema' = 'public',
  'exceptionmode' = 'ignore',
  'casesensitive' = '0',
  'writemode' = '1',
  'retrywaittime' = '200'
);

datagen_source のパラメーターは変更せずにそのまま使用してください。test_adbpg_table のプレースホルダー値は実際の値に置き換え、必要に応じて任意のパラメーターを調整してください。

コネクタパラメーター

必須パラメーター

パラメーター説明
connectorコネクタ名。形式:adbpg-nightly-{version}adbpg-nightly-1.13
urlJava Database Connectivity (JDBC) URL。形式:jdbc:postgresql://<internal-endpoint>:<port>/<database>jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
tablename送信先テーブル名test_adbpg_table
usernameデータベースアカウント名
passwordデータベースアカウントパスワード

任意パラメーター

パラメーターデフォルト値説明
conflictmodeupsertプライマリキーまたは一意なインデックスとの競合発生時のポリシー。詳細については、「競合処理モード」をご参照ください。
writemode1書き込み方式:0 = BATCH INSERT、1 = COPY API(最高速)、2 = BATCH UPSERT
batchsize500001 回の書き込みバッチあたりの最大行数
batchwritetimeoutms50000バッチ書き込みタイムアウト(ミリ秒単位)。この期間が経過すると、バッチが満杯でなくてもフラッシュされます。
maxretrytimes3書き込み失敗時の最大リトライ回数
retrywaittime100リトライ間隔(ミリ秒単位)
exceptionmodeignore例外処理ポリシー:ignore = 例外発生時の書き込みデータをスキップ、strict = フェールオーバーをトリガーしエラーを報告
targetschemapublicAnalyticDB for PostgreSQL データベース内のターゲットスキーマ
connectionmaxactive5接続プール内における TaskManager ごとの最大アクティブ接続数
casesensitive0カラムおよびテーブル名の大文字小文字の区別:0 = 区別しない、1 = 区別する
verbose0コネクタログ出力:0 = 無効、1 = 有効
説明 サポートされるパラメーターの完全な一覧およびデータ型マッピングについては、「AnalyticDB for PostgreSQL コネクタ」をご参照ください。

競合処理モード

conflictmode パラメーターは、レコードが既存のプライマリキーまたは一意なインデックスと競合した場合の動作を制御します。AnalyticDB for PostgreSQL では、INSERT ON CONFLICT 文と COPY ON CONFLICT 文を組み合わせて UPSERT 操作を実行します。

動作
upsert(デフォルト)INSERT ON CONFLICT および COPY ON CONFLICT を実行し、既存の行を上書きします。パーティションテーブルの場合、インスタンスのマイナーバージョンは V6.3.6.1 以降である必要があります。詳細については、「マイナーエンジンバージョンの更新」をご参照ください。
update競合する行を更新します
ignore既存の行を保持し、受信したレコードを破棄します
strictフェールオーバーをトリガーしエラーを報告します

デプロイメントの開始

  1. SQL エディタの右上隅で、デプロイ をクリックし、続いて OK をクリックします。

    説明 セッションクラスターは開発およびテスト専用です。本番環境へのデプロイメントには、セッションクラスターへの公開は行わないでください。「デプロイメントのデバッグ」をご参照ください。
  2. デプロイメント ページで、対象のデプロイメントを見つけ、開始操作 列からクリックします。

  3. ジョブの開始 ダイアログボックスで、開始 をクリックします。

結果の検証

  1. AnalyticDB for PostgreSQL データベースに接続します。詳細については、「クライアント接続」をご参照ください。

  2. テーブルをクエリして、データが書き込まれていることを確認します。

    SELECT * FROM test_adbpg_table;

    データは AnalyticDB for PostgreSQL データベースに書き込まれます。次の図に返された結果を示します。

    adbpg2.png

次のステップ

参照情報