このトピックでは、Realtime Compute for Apache Flinkを使用してAnalyticDB for PostgreSQLにデータを書き込む方法について説明します。
制限事項
Realtime Compute for Apache Flinkは、サーバーレスモードのAnalyticDB for PostgreSQLからデータを読み取ることができません。
Ververica Runtime (VVR) 6.0.0以降を使用するRealtime Compute for Apache Flinkのみが、AnalyticDB for PostgreSQLコネクタをサポートしています。
VVR 8.0.1以降を使用するRealtime Compute for Apache FlinkのみがAnalyticDB for PostgreSQL V7.0をサポートしています。
説明カスタムコネクタを使用する場合は、「カスタムコネクタの管理」で説明されている手順に従って操作を実行します。
前提条件
フルマネージドFlinkワークスペースが作成されます。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。
AnalyticDB for PostgreSQLインスタンスが作成されました。 詳細については、「インスタンスの作成」をご参照ください。
AnalyticDB for PostgreSQLインスタンスとフルマネージドFlinkワークスペースは、同じ仮想プライベートクラウド (VPC) にあります。
AnalyticDB for PostgreSQLインスタンスの設定
AnalyticDB for PostgreSQLコンソールにログインします。
フルマネージドFlinkワークスペースのCIDRブロックをAnalyticDB for PostgreSQLインスタンスのIPアドレスホワイトリストに追加します。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。
[データベースにログイン] をクリックします。 データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。
にテーブルを作成します。AnalyticDB for PostgreSQLインスタンスを作成します。
例:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
フルマネージドFlinkワークスペースの設定
完全に管理されたフリンクタブで、管理するワークスペースを見つけて、コンソールで、アクション列を作成します。
左側のナビゲーションウィンドウで、コネクタ.
コネクタページをクリックします。カスタムコネクタの作成.
作成するカスタムコネクタのJARファイルをアップロードします。
説明AnalyticDB For PostgreSQLのカスタムコネクタのJARファイルを取得する方法については、GitHubをご参照ください。
JARファイルは、Realtime Compute for Apache FlinkのFlinkエンジンと同じバージョンである必要があります。
JARファイルをアップロードした後、次へ.
アップロードしたJARファイルの内容が解析されます。 ファイルの解析が成功した場合は、次の手順に進みます。 ファイルの解析に失敗した場合は、カスタムコネクタのコードがApache Flinkコミュニティで定義されている標準に準拠しているかどうかを確認します。
仕上げ.
作成したカスタムコネクタがコネクタリストに表示されます。
Flinkジョブの作成
Realtime Compute for Apache Flinkコンソールにログインします。 [完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[SQLエディター] をクリックします。 SQLエディターページの左上隅で、[新規作成] をクリックします。 [新しいドラフト] ダイアログボックスで、[SQLスクリプト] タブの [空白のストリームドラフト] をクリックし、[次へ] をクリックします。
[新しいドラフト] ダイアログボックスで、次の表に示すパラメーターを設定します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクトで一意である必要があります。
adbpg-test
場所
ドラフトのコードファイルが保存されているフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。 Draft
エンジン版
ドラフトで使用されるFlinkのエンジンバージョン。 エンジンのバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。
vvr-6.0.7-flink-1.15
[作成] をクリックします。
AnalyticDB for PostgreSQLへのデータの書き込み
配置コードを記述します。
datagen_sourceという名前のソーステーブルとtest_adbpg_tableという名前のAnalyticDB for PostgreSQLテーブルを作成します。 次のコードをデプロイメントのコードエディターにコピーします。CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE test_adbpg_table ( `B1` bigint , `B2` bigint , `B3` VARCHAR , `B4` VARCHAR, PRIMARY KEY(B1) not ENFORCED ) with ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'xxx', 'tablename' = 'test_adbpg_table', 'username' = 'xxxx', 'url' = 'jdbc:postgresql://url:5432/schema', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );上記のコードでは、
datagen_sourceテーブルに関連するパラメーターの値を保持し、ビジネス要件に基づいてtest_adbpg_tableテーブルに関連するパラメーターの値を変更します。 次の表に、test_adbpg_tableテーブルのパラメーターを示します。パラメーター
必須
説明
connector
継続する
コネクタの名前。
adbpg-nightly-Version number形式です。 例:adbpg-nightly-1.13url
継続する
AnalyticDB for PostgreSQLインスタンスへの接続に使用されるJava Database Connectivity (JDBC) URL。 URLは
jdbc:postgresql://<Internal endpoint >:< Port number>/<Database name>形式です。 例:jdbc:postgresql:// gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgrestablename
継続する
AnalyticDB for PostgreSQLテーブルの名前。
username
継続する
AnalyticDB for PostgreSQLデータベースへの接続に使用されるデータベースアカウントの名前。
password
継続する
AnalyticDB for PostgreSQLデータベースアカウントのパスワード。
maxretrytimes
継続しない
データの書き込みが失敗した場合に、テーブルにデータを書き込むことができる再試行の最大数。 デフォルト値: 3。
batchsize
継続しない
一度にテーブルに書き込むことができる行の最大数。 デフォルト値: 50000
exceptionmode
継続しない
データ書き込み中に例外を処理するために使用されるポリシー。 有効な値:
ignore (default): システムは、例外中に書き込まれたデータを無視します。
strict: データの書き込み中に例外が発生した場合、システムはフェールオーバーを実行し、エラーを報告します。
conflictmode
継続しない
主キーまたは一意のインデックスの競合を処理するために使用されるポリシー。 有効な値:
ignore: プライマリキーの競合が発生した場合、システムはプライマリキーの競合を無視し、既存のデータを保持します。
strict: プライマリキーの競合が発生した場合、システムはフェールオーバーを実行し、エラーを報告します。
update: プライマリキーの競合が発生した場合、システムはデータを更新します。
upsert (デフォルト): プライマリキーの競合が発生した場合、システムはUPSERT操作を実行してデータを書き込みます。
AnalyticDB for PostgreSQLは、INSERT ON CONFLICTとCOPY ON CONFLICTステートメントの組み合わせを使用してUPSERT操作を実行します。 宛先テーブルがパーティションテーブルの場合、マイナーバージョンはV6.3.6.1以降である必要があります。 マイナーバージョンの更新方法については、「インスタンスのマイナーバージョンの更新」をご参照ください。
targetschema
継続しない
AnalyticDB for PostgreSQLデータベースのスキーマ。 デフォルト値 : public
writemode
継続しない
データの書き込みに使用されるメソッド。 有効な値:
0: BATCH INSERTステートメントは、データの書き込みに使用されます。
1 (デフォルト): COPY APIはデータの書き込みに使用されます。
2: BATCH UPSERTステートメントは、データの書き込みに使用されます。
verbose
継続しない
コネクタログを表示するかどうかを指定します。 有効な値:
0 (デフォルト): コネクタログは表示されません。
1: コネクタのログを表示します。
retrywaittime
継続しない
例外が発生したときの再試行の間隔。 単位:ミリ秒。 デフォルト値:100
batchwritetimeoutms
継続しない
バッチデータ書き込みのタイムアウト期間。 この期間が終了すると、データバッチが書き込まれます。 単位:ミリ秒。 デフォルト値: 50000
connectionmaxactive
継続しない
1つのタスクマネージャに対して同時に接続プールに割り当てることができるアクティブな接続の最大数。 既定値:5
casesensitive
継続しない
列名とテーブル名が大文字と小文字を区別するかどうかを指定します。 有効な値:
0 (デフォルト): 大文字と小文字を区別しません。
1: 大文字と小文字を区別します。
説明サポートされているパラメーターとデータ型マッピングの詳細については、「AnalyticDB For PostgreSQLコネクタ」をご参照ください。
デプロイを開始します。
SQLエディターページの右上隅で、[デプロイ] をクリックします。 表示されたメッセージボックスで [OK] をクリックします。
説明セッションクラスターは、非本番環境の開発およびテスト環境に適しています。 セッションクラスターでデプロイをデバッグして、JobManagerのリソース使用率を改善し、デプロイの起動を高速化できます。 ビジネスの安定性を確保するために、デプロイをセッションクラスターに公開しないことをお勧めします。 詳細については、「デプロイメントのデバッグ」をご参照ください。
[デプロイメント] ページで、開始するデプロイメントを見つけ、[操作] 列の [開始] をクリックします。
ジョブの開始メッセージで、[開始] をクリックします。
同期結果の確認
AnalyticDB for PostgreSQLデータベースに接続します。 詳細については、「クライアント接続」をご参照ください。
次のステートメントを実行して、
test_adbpg_tableテーブルを照会します。SELECT * FROM test_adbpg_table;データはAnalyticDB for PostgreSQLデータベースに書き込まれます。 次の図に、返される結果を示します。
