すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Realtime Compute for Apache Flinkを使用してAnalyticDB for PostgreSQLにデータを書き込む

最終更新日:Sep 24, 2024

このトピックでは、Realtime Compute for Apache Flinkを使用してAnalyticDB for PostgreSQLにデータを書き込む方法について説明します。

制限事項

  • Realtime Compute for Apache Flinkは、サーバーレスモードのAnalyticDB for PostgreSQLからデータを読み取ることができません。

  • Ververica Runtime (VVR) 6.0.0以降を使用するRealtime Compute for Apache Flinkのみが、AnalyticDB for PostgreSQLコネクタをサポートしています。

  • VVR 8.0.1以降を使用するRealtime Compute for Apache FlinkのみがAnalyticDB for PostgreSQL V7.0をサポートしています。

    説明

    カスタムコネクタを使用する場合は、「カスタムコネクタの管理」で説明されている手順に従って操作を実行します。

前提条件

  • フルマネージドFlinkワークスペースが作成されます。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。

  • AnalyticDB for PostgreSQLインスタンスが作成されました。 詳細については、「インスタンスの作成」をご参照ください。

  • AnalyticDB for PostgreSQLインスタンスとフルマネージドFlinkワークスペースは、同じ仮想プライベートクラウド (VPC) にあります。

AnalyticDB for PostgreSQLインスタンスの設定

  1. AnalyticDB for PostgreSQLコンソールにログインします。

  2. フルマネージドFlinkワークスペースのCIDRブロックをAnalyticDB for PostgreSQLインスタンスのIPアドレスホワイトリストに追加します。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。

  3. [データベースにログイン] をクリックします。 データベースへの接続方法の詳細については、「クライアント接続」をご参照ください。

  4. にテーブルを作成します。AnalyticDB for PostgreSQLインスタンスを作成します。

    例:

    CREATE TABLE test_adbpg_table(
    b1 int,
    b2 int,
    b3 text,
    PRIMARY KEY(b1)
    );

フルマネージドFlinkワークスペースの設定

  1. Realtime Compute for Apache Flinkコンソール.

  2. 完全に管理されたフリンクタブで、管理するワークスペースを見つけて、コンソールで、アクション列を作成します。

  3. 左側のナビゲーションウィンドウで、コネクタ.

  4. コネクタページをクリックします。カスタムコネクタの作成.

  5. 作成するカスタムコネクタのJARファイルをアップロードします。

    説明
    • AnalyticDB For PostgreSQLのカスタムコネクタのJARファイルを取得する方法については、GitHubをご参照ください。

    • JARファイルは、Realtime Compute for Apache FlinkのFlinkエンジンと同じバージョンである必要があります。

  6. JARファイルをアップロードした後、次へ.

    アップロードしたJARファイルの内容が解析されます。 ファイルの解析が成功した場合は、次の手順に進みます。 ファイルの解析に失敗した場合は、カスタムコネクタのコードがApache Flinkコミュニティで定義されている標準に準拠しているかどうかを確認します。

  7. 仕上げ.

    作成したカスタムコネクタがコネクタリストに表示されます。

Flinkジョブの作成

  1. Realtime Compute for Apache Flinkコンソールにログインします。 [完全管理Flink] タブで、管理するワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[SQLエディター] をクリックします。 SQLエディターページの左上隅で、[新規作成] をクリックします。 [新しいドラフト] ダイアログボックスで、[SQLスクリプト] タブの [空白のストリームドラフト] をクリックし、[次へ] をクリックします。

  3. [新しいドラフト] ダイアログボックスで、次の表に示すパラメーターを設定します。

    パラメーター

    説明

    名前

    作成するドラフトの名前。

    説明

    ドラフト名は、現在のプロジェクトで一意である必要があります。

    adbpg-test

    場所

    ドラフトのコードファイルが保存されているフォルダ。

    既存のフォルダの右側にある新建文件夹アイコンをクリックして、サブフォルダを作成することもできます。

    Draft

    エンジン版

    ドラフトで使用されるFlinkのエンジンバージョン。 エンジンのバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

    vvr-6.0.7-flink-1.15

  4. [作成] をクリックします。

AnalyticDB for PostgreSQLへのデータの書き込み

  1. 配置コードを記述します。

    datagen_sourceという名前のソーステーブルとtest_adbpg_tableという名前のAnalyticDB for PostgreSQLテーブルを作成します。 次のコードをデプロイメントのコードエディターにコピーします。

    CREATE TABLE datagen_source (
     f_sequence INT,
     f_random INT,
     f_random_str STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
     'fields.f_random_str.length'='10'
    );
    
    CREATE TABLE test_adbpg_table (
        `B1` bigint   ,
        `B2` bigint  ,
        `B3` VARCHAR ,
        `B4` VARCHAR,
         PRIMARY KEY(B1) not ENFORCED
    ) with (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'xxx',
       'tablename' = 'test_adbpg_table',
       'username' = 'xxxx',
       'url' = 'jdbc:postgresql://url:5432/schema',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    上記のコードでは、datagen_sourceテーブルに関連するパラメーターの値を保持し、ビジネス要件に基づいてtest_adbpg_tableテーブルに関連するパラメーターの値を変更します。 次の表に、test_adbpg_tableテーブルのパラメーターを示します。

    パラメーター

    必須

    説明

    connector

    継続する

    コネクタの名前。 adbpg-nightly-Version number形式です。 例: adbpg-nightly-1.13

    url

    継続する

    AnalyticDB for PostgreSQLインスタンスへの接続に使用されるJava Database Connectivity (JDBC) URL。 URLはjdbc:postgresql://<Internal endpoint >:< Port number>/<Database name> 形式です。 例: jdbc:postgresql:// gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres

    tablename

    継続する

    AnalyticDB for PostgreSQLテーブルの名前。

    username

    継続する

    AnalyticDB for PostgreSQLデータベースへの接続に使用されるデータベースアカウントの名前。

    password

    継続する

    AnalyticDB for PostgreSQLデータベースアカウントのパスワード。

    maxretrytimes

    継続しない

    データの書き込みが失敗した場合に、テーブルにデータを書き込むことができる再試行の最大数。 デフォルト値: 3。

    batchsize

    継続しない

    一度にテーブルに書き込むことができる行の最大数。 デフォルト値: 50000

    exceptionmode

    継続しない

    データ書き込み中に例外を処理するために使用されるポリシー。 有効な値:

    • ignore (default): システムは、例外中に書き込まれたデータを無視します。

    • strict: データの書き込み中に例外が発生した場合、システムはフェールオーバーを実行し、エラーを報告します。

    conflictmode

    継続しない

    主キーまたは一意のインデックスの競合を処理するために使用されるポリシー。 有効な値:

    • ignore: プライマリキーの競合が発生した場合、システムはプライマリキーの競合を無視し、既存のデータを保持します。

    • strict: プライマリキーの競合が発生した場合、システムはフェールオーバーを実行し、エラーを報告します。

    • update: プライマリキーの競合が発生した場合、システムはデータを更新します。

    • upsert (デフォルト): プライマリキーの競合が発生した場合、システムはUPSERT操作を実行してデータを書き込みます。

      AnalyticDB for PostgreSQLは、INSERT ON CONFLICTCOPY ON CONFLICTステートメントの組み合わせを使用してUPSERT操作を実行します。 宛先テーブルがパーティションテーブルの場合、マイナーバージョンはV6.3.6.1以降である必要があります。 マイナーバージョンの更新方法については、「インスタンスのマイナーバージョンの更新」をご参照ください。

    targetschema

    継続しない

    AnalyticDB for PostgreSQLデータベースのスキーマ。 デフォルト値 : public

    writemode

    継続しない

    データの書き込みに使用されるメソッド。 有効な値:

    • 0: BATCH INSERTステートメントは、データの書き込みに使用されます。

    • 1 (デフォルト): COPY APIはデータの書き込みに使用されます。

    • 2: BATCH UPSERTステートメントは、データの書き込みに使用されます。

    verbose

    継続しない

    コネクタログを表示するかどうかを指定します。 有効な値:

    • 0 (デフォルト): コネクタログは表示されません。

    • 1: コネクタのログを表示します。

    retrywaittime

    継続しない

    例外が発生したときの再試行の間隔。 単位:ミリ秒。 デフォルト値:100

    batchwritetimeoutms

    継続しない

    バッチデータ書き込みのタイムアウト期間。 この期間が終了すると、データバッチが書き込まれます。 単位:ミリ秒。 デフォルト値: 50000

    connectionmaxactive

    継続しない

    1つのタスクマネージャに対して同時に接続プールに割り当てることができるアクティブな接続の最大数。 既定値:5

    casesensitive

    継続しない

    列名とテーブル名が大文字と小文字を区別するかどうかを指定します。 有効な値:

    • 0 (デフォルト): 大文字と小文字を区別しません。

    • 1: 大文字と小文字を区別します。

    説明

    サポートされているパラメーターとデータ型マッピングの詳細については、「AnalyticDB For PostgreSQLコネクタ」をご参照ください。

  2. デプロイを開始します。

    1. SQLエディターページの右上隅で、[デプロイ] をクリックします。 表示されたメッセージボックスで [OK] をクリックします。

      説明

      セッションクラスターは、非本番環境の開発およびテスト環境に適しています。 セッションクラスターでデプロイをデバッグして、JobManagerのリソース使用率を改善し、デプロイの起動を高速化できます。 ビジネスの安定性を確保するために、デプロイをセッションクラスターに公開しないことをお勧めします。 詳細については、「デプロイメントのデバッグ」をご参照ください。

    2. [デプロイメント] ページで、開始するデプロイメントを見つけ、[操作] 列の [開始] をクリックします。

    3. ジョブの開始メッセージで、[開始] をクリックします。

同期結果の確認

  1. AnalyticDB for PostgreSQLデータベースに接続します。 詳細については、「クライアント接続」をご参照ください。

  2. 次のステートメントを実行して、test_adbpg_tableテーブルを照会します。

    SELECT * FROM test_adbpg_table;

    データはAnalyticDB for PostgreSQLデータベースに書き込まれます。 次の図に、返される結果を示します。

    adbpg2.png

関連ドキュメント