このトピックでは、Alibaba Cloud Realtime Compute for Apache Flink を使用して AnalyticDB for PostgreSQL からデータを読み取り、データを書き込む方法について説明します。
背景情報
AnalyticDB for PostgreSQL は、超並列処理 (MPP) のためのデータウェアハウスです。大量のデータに対してオンライン分析サービスを提供します。Realtime Compute for Apache Flink は、Apache Flink に基づいて構築されたリアルタイムビッグデータ分析プラットフォームです。Realtime Compute for Apache Flink は、さまざまなアップストリームおよびダウンストリームのコネクタを提供して、さまざまなビジネスシナリオの要件を満たし、効率的で柔軟なリアルタイムコンピューティングサービスを提供します。Realtime Compute for Apache Flink は、AnalyticDB for PostgreSQL からデータを読み取ることができます。これにより、AnalyticDB for PostgreSQL の利点を最大限に活用し、データ分析の効率と精度を向上させることができます。
制限事項
Realtime Compute for Apache Flink は、サーバーレスモードの AnalyticDB for PostgreSQL からデータを読み取ることはできません。
Ververica Runtime (VVR) 6.0.0 以降を使用する Realtime Compute for Apache Flink のみが AnalyticDB for PostgreSQL コネクタをサポートします。
VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink のみが AnalyticDB for PostgreSQL V7.0 をサポートします。
説明カスタムコネクタを使用する場合は、「カスタムコネクタの管理」に記載されている手順に従って操作を実行してください。
前提条件
作成する AnalyticDB for PostgreSQL インスタンスと作成するフルマネージド Flink ワークスペースは、同じ仮想プライベートクラウド (VPC) 内に存在します。
説明AnalyticDB for PostgreSQL インスタンスとフルマネージド Flink ワークスペースが同じ VPC にない場合は、「フルマネージド Flink はどのようにして VPC をまたいでサービスにアクセスしますか?」に記載されている手順に従って問題を処理してください。
フルマネージド Flink ワークスペースが作成されます。詳細については、「フルマネージド Flink のアクティブ化」をご参照ください。
AnalyticDB for PostgreSQL インスタンスとアカウントが作成されます。詳細については、「インスタンスの作成」および「特権アカウントの作成」をご参照ください。
ステップ 1: ホワイトリストの設定とデータの準備
- AnalyticDB for PostgreSQL コンソールにログインします。
フルマネージド Flink ワークスペースの CIDR ブロックを AnalyticDB for PostgreSQL インスタンスのホワイトリストに追加します。
フルマネージド Flink ワークスペースが属する vSwitch の CIDR ブロックを表示します。詳細については、「ホワイトリストの設定方法」をご参照ください。
フルマネージド Flink ワークスペースの CIDR ブロックをターゲット AnalyticDB for PostgreSQL インスタンスのホワイトリストに追加します。詳細については、「手順」をご参照ください。
説明インターネット経由で AnalyticDB for PostgreSQL インスタンスにアクセスする場合は、パブリック IP アドレスをホワイトリストに追加します。
インスタンス詳細ページの右上隅にある [データベースにログイン] をクリックし、ユーザー名とパスワードを入力します。データベースへのアクセス方法の詳細については、「クライアントツールを使用してインスタンスに接続する」をご参照ください。
インスタンスのターゲットデータベースに adbpg_dim_table という名前のテーブルを作成し、そのテーブルに 50 行のデータを挿入します。
ステートメントの例:
-- adbpg_dim_table という名前のディメンションテーブルを作成します。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); -- adbpg_dim_table テーブルに 50 行のデータを挿入します。id フィールドの値は 1 から 50 までの整数で、username フィールドの値は現在の行番号のテキストに username 文字列が続くものです。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);select * from adbpg_dim_table order by id;文を実行して、挿入されたデータを表示できます。Realtime Compute for Apache Flink が結果データを書き込む adbpg_sink_table という名前の結果テーブルを作成します。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
ステップ 2: ジョブドラフトの作成
Realtime Compute for Apache Flink コンソールにログインし、ワークスペースを見つけて、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。SQL エディタページの左上隅にある [+] アイコンをクリックし、[新しい空白のストリームドラフト] を選択します。
[新しいドラフト] ダイアログボックスで、ドラフトのパラメーターを設定します。次の表にパラメーターを示します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は現在のプロジェクト内で一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックしてサブフォルダを作成することもできます。ドラフト
エンジンバージョン
ドラフトで使用される Flink のエンジンバージョン。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-8.0.1-flink-1.17
[作成] をクリックします。
ステップ 3: ドラフトコードの記述とデプロイ
ドラフトの次のコードをコードエディタにコピーします。
--- Datagen ソーステーブルを作成します。この例では、WITH 句のパラメーターを変更する必要はありません。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- AnalyticDB for PostgreSQL ディメンションテーブルを作成します。ビジネス要件に基づいて WITH 句のパラメーターを変更する必要があります。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes'='2', -- テーブルへのデータ書き込みが失敗した後の最大書き込みリトライ回数を指定します。 'cache'='lru', -- キャッシュポリシーを指定します。 'cacheSize'='100' -- キャッシュサイズを指定します。 ); -- AnalyticDB for PostgreSQL 結果テーブルを作成します。ビジネス要件に基づいて WITH 句のパラメーターを変更する必要があります。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes' = '2', 'conflictMode' = 'ignore',-- データ挿入中にプライマリキーの競合またはインデックスの競合が発生した場合に使用されるポリシーを指定します。 'retryWaitTime' = '200' -- リトライの間隔を指定します。 ); -- ディメンションテーブルとソーステーブルを結合した後に得られる結果を AnalyticDB for PostgreSQL 結果テーブルに挿入します。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;ビジネス要件に基づいてパラメーターを変更します。
この例では、Datagen ソーステーブルを変更する必要はありません。ビジネス要件に基づいて、AnalyticDB for PostgreSQL ディメンションテーブルと AnalyticDB for PostgreSQL 結果テーブルのパラメーターを変更する必要があります。次の表にパラメーターを示します。関連するコネクタのパラメーターとデータ型のマッピングの詳細については、「参照」をご参照ください。
パラメーター
必須
説明
url
はい
AnalyticDB for PostgreSQL インスタンスへの接続に使用される JDBC URL。JDBC URL は
jdbc:postgresql://<内部エンドポイント>:<ポート番号>/<データベース名>形式です。AnalyticDB for PostgreSQL コンソールにログインして、インスタンスの [データベース接続] ページで URL を表示できます。tablename
はい
AnalyticDB for PostgreSQL データベース内のテーブルの名前。
username
はい
AnalyticDB for PostgreSQL データベースへのアクセスに使用されるユーザー名。
password
はい
AnalyticDB for PostgreSQL データベースへの接続に使用されるデータベースアカウントのパスワード。
targetSchema
いいえ
スキーマの名前。デフォルト値: public。データベースで別のスキーマを使用する場合は、このパラメーターを指定します。
SQL エディタページの右上隅にある [検証] をクリックして構文チェックを実行します。
SQL エディタページの右上隅にある [デプロイ] をクリックします。
ページで、目的のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
ステップ 4: Realtime Compute for Apache Flink が結果テーブルに書き込むデータを表示する
- AnalyticDB for PostgreSQL コンソールにログインします。
[データベースにログイン] をクリックします。データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。
次の文を実行して、Realtime Compute for Apache Flink が結果テーブルに書き込むデータを表示します。
SELECT * FROM adbpg_sink_table ORDER BY id;次の図に結果を示します。
