このトピックでは、Alibaba Cloud Realtime Compute for Apache Flink を使用して AnalyticDB for PostgreSQL の完全データをサブスクライブする方法について説明します。
背景情報
AnalyticDB for PostgreSQL は、超並列処理(MPP)データウェアハウスサービスです。大量のデータのリアルタイム分析を提供します。Realtime Compute for Apache Flink は、Apache Flink をベースに構築されたリアルタイムビッグデータ分析プラットフォームです。Realtime Compute for Apache Flink は、さまざまなアップストリームおよびダウンストリームコネクタを提供して、さまざまなビジネスシナリオの要件を満たし、効率的かつ柔軟なリアルタイムコンピューティングサービスを提供します。Realtime Compute for Apache Flink は、AnalyticDB for PostgreSQL からデータを読み取ることができます。これにより、AnalyticDB for PostgreSQL の利点が最大限に活用され、データ分析の効率と精度が向上します。
制限事項
Realtime Compute for Apache Flink は、サーバーレスモードの AnalyticDB for PostgreSQL からデータを読み取ることはできません。
Ververica Runtime(VVR)6.0.0 以降を使用する Realtime Compute for Apache Flink のみ、AnalyticDB for PostgreSQL コネクタをサポートしています。
VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink のみ、AnalyticDB for PostgreSQL V7.0 をサポートしています。
説明カスタム コネクタを使用する場合は、カスタム コネクタの管理に記載されている手順に従って操作を実行してください。
前提条件
フルマネージド Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
AnalyticDB for PostgreSQL インスタンスが作成されていること。詳細については、「インスタンスを作成する」をご参照ください。
AnalyticDB for PostgreSQL インスタンスとフルマネージド Flink ワークスペースが同じ仮想プライベートクラウド(VPC)内にあること。
手順 1:AnalyticDB for PostgreSQL インスタンスを構成する
- AnalyticDB for PostgreSQL コンソール にログオンします。
フルマネージド Flink ワークスペースの CIDR ブロックを AnalyticDB for PostgreSQL インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを構成する」をご参照ください。
[データベースにログオン] をクリックします。データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。
AnalyticDB for PostgreSQL インスタンスに adbpg_dim_table という名前のディメンションテーブルを作成し、テーブルに 50 行のデータを挿入します。
ステートメントの例:
-- adbpg_dim_table という名前のディメンションテーブルを作成します。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); -- adbpg_dim_table テーブルに 50 行のデータを挿入します。 id フィールドの値は 1 から 50 までの整数で、username フィールドの値は現在の行数に username 文字列が続くテキストです。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);Realtime Compute for Apache Flink が結果データを書き込む adbpg_sink_table という名前の結果テーブルを作成します。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
手順 2: Realtime Compute for Apache Flink ドラフトを作成する
Realtime Compute for Apache Flink コンソール にログオンします。[Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[SQL エディタ] をクリックします。 [SQL エディタ] ページの左上隅にある [新規] をクリックします。 [新規ドラフト] ダイアログボックスで、[SQL スクリプト] タブの [空のストリームドラフト] をクリックし、[次へ] をクリックします。
[新規ドラフト] ダイアログボックスで、次の表に示すパラメータを構成します。
パラメータ
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクト内で一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。ドラフト
エンジンバージョン
ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-6.0.7-flink-1.15
[作成] をクリックします。
ステップ 3: ドラフトコードを記述し、ドラフトをデプロイする
ドラフトの次のコードをコードエディタにコピーします。
--- Datagen ソーステーブルを作成します。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- AnalyticDB for PostgreSQL ディメンションテーブルを作成します。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flink****test', 'password' = '*******', 'maxJoinRows'='100', 'maxRetryTimes'='1', 'cache'='lru', 'cacheSize'='1000' ); -- AnalyticDB for PostgreSQL 結果テーブルを作成します。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flink****test', 'password' = '******', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- ディメンションテーブルとソーステーブルの結合結果を AnalyticDB for PostgreSQL 結果テーブルに挿入します。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds join dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;ビジネス要件に基づいて、次の表に示すパラメータを変更します。
パラメータ
必須
説明
URL
はい
AnalyticDB for PostgreSQL インスタンスへの接続に使用する Java Database Connectivity(JDBC)URL。JDBC URL の形式は、
jdbc:postgresql://<内部エンドポイント>:<ポート番号>/<データベース名>です。例:jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres。tablename
はい
AnalyticDB for PostgreSQL データベースのテーブル名。
username
はい
AnalyticDB for PostgreSQL データベースへの接続に使用するデータベースアカウント名。
password
はい
AnalyticDB for PostgreSQL データベースアカウントのパスワード。
説明パラメータとデータ型マッピングの詳細については、「AnalyticDB for PostgreSQL コネクタ」をご参照ください。
[SQL エディタ] ページの右上隅にある [検証] をクリックして、構文チェックを実行します。
[SQL エディタ] ページの右上隅にある [デプロイ] をクリックします。
[デプロイメント] ページで、目的のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
手順 4:Realtime Compute for Apache Flink が結果テーブルに書き込んだデータをクエリする手順 4: Realtime Compute for Apache Flink が結果テーブルに書き込むデータをクエリする
- AnalyticDB for PostgreSQL コンソール にログオンします。
[データベースにログオン] をクリックします。データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。
次のステートメントを実行して、Realtime Compute for Apache Flink が結果テーブルに書き込んだデータをクエリします。
SELECT * FROM adbpg_sink_table;