すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:リアルタイムで完全データを読み書きするために Realtime Compute for Apache Flink を使用する

最終更新日:Apr 15, 2025

このトピックでは、Alibaba Cloud Realtime Compute for Apache Flink を使用して AnalyticDB for PostgreSQL の完全データをサブスクライブする方法について説明します。

背景情報

AnalyticDB for PostgreSQL は、超並列処理(MPP)データウェアハウスサービスです。大量のデータのリアルタイム分析を提供します。Realtime Compute for Apache Flink は、Apache Flink をベースに構築されたリアルタイムビッグデータ分析プラットフォームです。Realtime Compute for Apache Flink は、さまざまなアップストリームおよびダウンストリームコネクタを提供して、さまざまなビジネスシナリオの要件を満たし、効率的かつ柔軟なリアルタイムコンピューティングサービスを提供します。Realtime Compute for Apache Flink は、AnalyticDB for PostgreSQL からデータを読み取ることができます。これにより、AnalyticDB for PostgreSQL の利点が最大限に活用され、データ分析の効率と精度が向上します。

制限事項

  • Realtime Compute for Apache Flink は、サーバーレスモードの AnalyticDB for PostgreSQL からデータを読み取ることはできません。

  • Ververica Runtime(VVR)6.0.0 以降を使用する Realtime Compute for Apache Flink のみ、AnalyticDB for PostgreSQL コネクタをサポートしています。

  • VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink のみ、AnalyticDB for PostgreSQL V7.0 をサポートしています。

    説明

    カスタム コネクタを使用する場合は、カスタム コネクタの管理に記載されている手順に従って操作を実行してください。

前提条件

  • フルマネージド Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。

  • AnalyticDB for PostgreSQL インスタンスが作成されていること。詳細については、「インスタンスを作成する」をご参照ください。

  • AnalyticDB for PostgreSQL インスタンスとフルマネージド Flink ワークスペースが同じ仮想プライベートクラウド(VPC)内にあること。

手順 1:AnalyticDB for PostgreSQL インスタンスを構成する

  1. AnalyticDB for PostgreSQL コンソール にログオンします。
  2. フルマネージド Flink ワークスペースの CIDR ブロックを AnalyticDB for PostgreSQL インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを構成する」をご参照ください。

  3. [データベースにログオン] をクリックします。データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。

  4. AnalyticDB for PostgreSQL インスタンスに adbpg_dim_table という名前のディメンションテーブルを作成し、テーブルに 50 行のデータを挿入します。

    ステートメントの例:

    -- adbpg_dim_table という名前のディメンションテーブルを作成します。
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    -- adbpg_dim_table テーブルに 50 行のデータを挿入します。 id フィールドの値は 1 から 50 までの整数で、username フィールドの値は現在の行数に username 文字列が続くテキストです。
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. Realtime Compute for Apache Flink が結果データを書き込む adbpg_sink_table という名前の結果テーブルを作成します。

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

手順 2: Realtime Compute for Apache Flink ドラフトを作成する

  1. Realtime Compute for Apache Flink コンソール にログオンします。[Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[SQL エディタ] をクリックします。 [SQL エディタ] ページの左上隅にある [新規] をクリックします。 [新規ドラフト] ダイアログボックスで、[SQL スクリプト] タブの [空のストリームドラフト] をクリックし、[次へ] をクリックします。

  3. [新規ドラフト] ダイアログボックスで、次の表に示すパラメータを構成します。

    パラメータ

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は、現在のプロジェクト内で一意である必要があります。

    adbpg-test

    場所

    ドラフトのコードファイルが保存されるフォルダ。

    既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

    ドラフト

    エンジンバージョン

    ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

    vvr-6.0.7-flink-1.15

  4. [作成] をクリックします。

ステップ 3: ドラフトコードを記述し、ドラフトをデプロイする

  1. ドラフトの次のコードをコードエディタにコピーします。

    --- Datagen ソーステーブルを作成します。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- AnalyticDB for PostgreSQL ディメンションテーブルを作成します。
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flink****test',
     'password' = '*******',
     'maxJoinRows'='100',
     'maxRetryTimes'='1', 
     'cache'='lru',
     'cacheSize'='1000'
    );
    
    -- AnalyticDB for PostgreSQL 結果テーブルを作成します。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flink****test',
      'password' = '******',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- ディメンションテーブルとソーステーブルの結合結果を AnalyticDB for PostgreSQL 結果テーブルに挿入します。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
     join
    dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. ビジネス要件に基づいて、次の表に示すパラメータを変更します。

    パラメータ

    必須

    説明

    URL

    はい

    AnalyticDB for PostgreSQL インスタンスへの接続に使用する Java Database Connectivity(JDBC)URL。JDBC URL の形式は、jdbc:postgresql://<内部エンドポイント>:<ポート番号>/<データベース名> です。例:jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres

    tablename

    はい

    AnalyticDB for PostgreSQL データベースのテーブル名。

    username

    はい

    AnalyticDB for PostgreSQL データベースへの接続に使用するデータベースアカウント名。

    password

    はい

    AnalyticDB for PostgreSQL データベースアカウントのパスワード。

    説明

    パラメータとデータ型マッピングの詳細については、「AnalyticDB for PostgreSQL コネクタ」をご参照ください。

  3. [SQL エディタ] ページの右上隅にある [検証] をクリックして、構文チェックを実行します。

  4. [SQL エディタ] ページの右上隅にある [デプロイ] をクリックします。

  5. [デプロイメント] ページで、目的のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。

手順 4:Realtime Compute for Apache Flink が結果テーブルに書き込んだデータをクエリする手順 4: Realtime Compute for Apache Flink が結果テーブルに書き込むデータをクエリする

  1. AnalyticDB for PostgreSQL コンソール にログオンします。
  2. [データベースにログオン] をクリックします。データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。

  3. 次のステートメントを実行して、Realtime Compute for Apache Flink が結果テーブルに書き込んだデータをクエリします。

    SELECT * FROM adbpg_sink_table;

    image.png

参照