すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Apache Paimon の基本機能の概要

最終更新日:Feb 21, 2025

このトピックでは、Realtime Compute for Apache Flink の開発コンソールで Apache Paimon の基本機能を使用する方法について説明します。基本機能を使用すると、Apache Paimon カタログの作成と削除、Apache Paimon テーブルの作成と削除、Apache Paimon テーブルへのデータの書き込み、Apache Paimon テーブルのデータの更新と使用を行うことができます。

前提条件

  • RAM ユーザーまたは RAM ロールを使用して Realtime Compute for Apache Flink の開発コンソール にアクセスする場合は、RAM ユーザーまたは RAM ロールに必要な権限が付与されていることを確認してください。詳細については、「権限管理」をご参照ください。

  • ワークスペースが作成されている。詳細については、「Realtime Compute for Apache Flink をアクティブにする」をご参照ください。

  • Object Storage Service (OSS) がアクティブ化され、ストレージタイプが標準ストレージの OSS バケットが作成されている。詳細については、「OSS コンソールを使用して開始する」をご参照ください。OSS は、データファイルやメタデータファイルなど、Apache Paimon テーブルに関連するファイルを保存するために使用されます。

  • Ververica Runtime (VVR) 8.0.5 以降を使用する Realtime Compute for Apache Flink のみ、Apache Paimon テーブルをサポートしています。

手順 1: Apache Paimon カタログを作成する

  1. スクリプト タブに移動します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。[スクリプト] タブで、[スクリプト] を作成します。

  2. スクリプトエディターで、次のコードを入力して Apache Paimon カタログを作成します。

    -- my-Catalog はカスタムカタログの名前です。
    CREATE Catalog `my-catalog` WITH (
      'type' = 'paimon',
      'metastore' = 'filesystem',
      'warehouse' = '<warehouse>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    次の表にパラメーターを示します。

    パラメーター

    説明

    必須

    備考

    type

    カタログのタイプ。

    はい

    値を Paimon に設定します。

    metastore

    メタデータストレージタイプ。

    はい

    この例では、パラメーターは filesystem に設定されています。他のタイプについては、「Apache Paimon カタログを管理する」をご参照ください。

    warehouse

    OSS で指定されたデータウェアハウスディレクトリ。

    はい

    フォーマットは oss://<bucket>/<object> です。ディレクトリ内のパラメーター:

    • bucket: 作成した OSS バケットの名前を示します。

    • object: データが保存されているパスを示します。

    OSS コンソール でバケット名とオブジェクト名を表示できます。

    fs.oss.endpoint

    OSS のエンドポイント。

    いいえ

    warehouse パラメーターで指定された OSS バケットが Realtime Compute for Apache Flink ワークスペースと同じリージョンにない場合、または別の Alibaba Cloud アカウント内の OSS バケットを使用する場合は、このパラメーターが必要です。

    詳細については、「リージョンとエンドポイント」をご参照ください。

    説明

    Apache Paimon テーブルを OSS-HDFS に保存するには、fs.oss.endpoint、fs.oss.accessKeyId、および fs.oss.accessKeySecret パラメーターを設定する必要があります。 fs.oss.endpoint パラメーターの値は、cn-<region>.oss-dls.aliyuncs.com 形式です (例: cn-hangzhou.oss-dls.aliyuncs.com)。

    fs.oss.accessKeyId

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

    いいえ

    warehouse パラメーターで指定された OSS バケットが Realtime Compute for Apache Flink ワークスペースと同じリージョンにない場合、または別の Alibaba Cloud アカウント内の OSS バケットを使用する場合は、このパラメーターが必要です。 AccessKey ペアの取得方法については、「AccessKey ペアを作成する」をご参照ください。

    fs.oss.accessKeySecret

    OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

    いいえ

  3. Apache Paimon カタログを作成するためのコードを選択し、スクリプトエディターの左側にある [実行] をクリックします。

    次のステートメントは正常に実行されました。 メッセージが返された場合、カタログが作成されています。

手順 2: Apache Paimon テーブルを作成する

  1. [スクリプト] タブで、スクリプトエディターに次のコードを入力して、my_db という名前の Apache Paimon データベースと my_tbl という名前の Apache Paimon テーブルを作成します。

    CREATE DATABASE `my-catalog`.`my_db`;
    CREATE TABLE `my-catalog`.`my_db`.`my_tbl` (
      dt STRING,
      id BIGINT,
      content STRING,
      PRIMARY KEY (dt, id) NOT ENFORCED
    ) PARTITIONED BY (dt) WITH (
      'changelog-producer' = 'lookup'  
    );
    説明

    この例では、WITH 句の changelog-producer パラメーターを lookup に設定して、ルックアップポリシーを使用して変更ログを生成しています。これにより、ストリーミングモードで Apache Paimon テーブルからデータを使用できます。変更ログの生成については、「変更データ生成メカニズム」をご参照ください。

  2. Apache Paimon データベースと Apache Paimon テーブルを作成するためのコードを選択し、スクリプトエディターの左側にある [実行] をクリックします。

    The following statement has been executed successfully! メッセージが返された場合、my_db という名前の Apache Paimon データベースと my_tbl という名前の Apache Paimon テーブルが作成されています。

手順 3: Apache Paimon テーブルにデータを書き込む

  1. [下書き] タブの [開発] > [ETL] ページで、[新規] をクリックします。 [新規下書き] ダイアログボックスの [SQL スクリプト] タブで、[空のストリーム下書き] をクリックします。 SQL 下書きの開発方法の詳細については、「SQL 下書きを開発する」をご参照ください。次の INSERT 文を SQL エディターにコピーします。

    -- Apache Paimon の結果テーブルは、各チェックポイント処理が完了した後にのみデータをコミットします。
    -- この例では、結果をすばやく取得できるように、チェックポイントの間隔を 10 秒に短縮しています。
    -- 実稼働環境では、チェックポイントの間隔とチェックポイントの試行間の最小一時停止は、待機時間に関するビジネス要件によって異なります。ほとんどの場合、1 ~ 10 分に設定されます。
    SET 'execution.checkpointing.interval'='10s';
    INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108',1,'apple'), ('20240108',2,'banana'), ('20240109',1,'cat'), ('20240109',2,'dog');
  2. SQL エディターページの右上隅にある [デプロイ] をクリックします。 [下書きのデプロイ] ダイアログボックスで、パラメーターを設定し、[確認] をクリックします。

  3. [O&M] > [デプロイメント] ページで、目的のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。 [ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

    デプロイメントステータスが [完了] に変わると、データがデプロイメントに書き込まれます。

手順 4: Apache Paimon テーブルのデータをストリーミングモードで使用する

  1. 空のストリーミングドラフトを作成し、次のコードを SQL エディターにコピーします。このコードは、Print コネクタを使用して my_tbl テーブルのすべてのデータをログにエクスポートします。

    CREATE TEMPORARY TABLE Print (
      dt STRING,
      id BIGINT,
      content STRING
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO Print SELECT * FROM `my-catalog`.`my_db`.`my_tbl`;
  2. SQL エディターページの右上隅にある [デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、パラメーターを設定し、[確認] をクリックします。

  3. [O&M] > [デプロイメント] ページで、目的のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。[ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

  4. [デプロイメント] ページで、計算結果を表示します。

    1. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] をクリックします。[デプロイメント] ページで、管理するデプロイメントの名前をクリックします。

    2. [ログ] タブの [ログ] タブで、[実行中のタスク マネージャー] タブの [パス、ID] 列の値をクリックします。

    3. [標準出力] タブをクリックして、使用されている Apache Paimon データを表示します。

    Paimon快速开始.jpg

手順 3: Apache Paimon テーブルにデータを書き込む

  1. 空のストリーミングドラフトを作成し、次のコードを SQL エディターにコピーします。

    SET 'execution.checkpointing.interval' = '10s';
    INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108', 1, 'hello'), ('20240109', 2, 'world');
  2. SQL エディター ページの右上隅にある、[デプロイ] をクリックします。[デプロイ ドラフト] ダイアログ ボックスでパラメーターを構成し、[確認] をクリックします。

  3. [O&M] > [デプロイメント] ページで、目的のデプロイメントを探し、[アクション] 列の [開始] をクリックします。[ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

    デプロイメント ステータスが [完了] に変わると、データは Apache Paimon テーブルに書き込まれます。

  4. 手順 4 の説明に従って、[デプロイメント] ページから [stdout] タブに移動し、Apache Paimon テーブルで更新されたデータを表示します。

    Paimon快速开始1.jpg

(オプション) ステップ 6: ストリーミングモードでデータが消費されるデプロイメントをキャンセルし、リソースをクリアする

テストが完了したら、次の手順を実行して、ストリーミングモードでデータが消費されるデプロイメントをキャンセルし、リソースをクリアできます。

  1. [O&M] > [デプロイメント] ページで、キャンセルするデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。

  2. SQL エディターページで、[スクリプト] タブをクリックします。[スクリプト] タブの SQL エディターで、次のコードを入力して、Apache Paimon データファイルと Apache Paimon カタログを削除します。

    DROP DATABASE 'my-catalog'.'my_db' CASCADE; -- OSS に保存されている Apache Paimon データベースのすべてのデータファイルを削除します。
    DROP CATALOG 'my-catalog'; -- Realtime Compute for Apache Flink の開発コンソールのメタデータから Apache Paimon カタログを削除します。OSS に保存されているデータファイルは削除されません。

    The following statement has been executed successfully! メッセージが返された場合、Apache Paimon データファイルと Apache Paimon カタログは削除されています。

参照資料