すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Realtime Compute for Apache Flinkを使用してApache Paimon外部テーブルを作成する

最終更新日:Jan 17, 2025

MaxComputeを使用すると、Apache Paimon外部テーブルを作成し、その外部テーブルとObject Storage Service (OSS) に格納されているApache Paimonテーブルのディレクトリとの間のマッピングを確立できます。 これにより、MaxComputeのApache Paimon外部テーブルを使用して、OSSに保存されているApache Paimonテーブルのデータにアクセスできます。 このトピックでは、Realtime Compute for Apache Flinkを使用してApache Paimon外部テーブルを作成する方法と、MaxComputeのApache Paimon外部テーブルを使用してデータをクエリする方法について説明します。

背景情報

Apache Paimonは、高スループットの書き込みと低レイテンシーのクエリをサポートする、統合されたストリーミングおよびバッチ処理レイクストレージフォーマットです。 Alibaba Cloud Realtime compute for Apache FlinkのSpark、Hive、Trino、E-MapReduceなどの一般的なコンピューティングエンジンは、Apache Paimonとシームレスに統合されています。 Apache Paimonは、OSS上に独自のデータレイクストレージサービスをすばやく構築し、そのサービスをMaxComputeに接続してデータレイク分析を実装するのに役立ちます。 Apache Paimonの詳細については、「Apache Paimon」をご参照ください。

前提条件

  • 操作の実行に使用するAlibaba Cloudアカウントには、MaxComputeテーブルを作成するためのCreateTable権限があります。 テーブル権限の詳細については、「MaxCompute権限」をご参照ください。

  • MaxComputeプロジェクトが作成されます。 詳細については、「MaxComputeプロジェクトの作成」をご参照ください。

  • OSSが有効化されています。 バケットとファイルディレクトリが作成されます。 詳細は、「バケットの作成」をご参照ください。

    説明

    MaxComputeは特定のリージョンにのみデプロイされます。 クロスリージョンデータ接続の問題を防ぐために、MaxComputeプロジェクトと同じリージョンでバケットを使用することを推奨します。

  • フルマネージドFlinkが有効化されています。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。

注意事項

  • MaxComputeは、Apache Paimon外部テーブルからのみデータを読み取ることができますが、Apache Paimon外部テーブルにデータを書き込んだり、Apache Paimon外部テーブルのスキーマ変更を自動的に同期したりすることはできません。

  • Apache Paimonは、スキーマ機能が有効になっているMaxComputeプロジェクトをサポートしていません。

  • Apache Paimon外部テーブルは、クラスタリング属性をサポートしていません。

  • Apache Paimon外部テーブルは、履歴バージョンのデータのクエリやバックトラックなどの機能をサポートしていません。

手順1: Apache PaimonプラグインをMaxComputeプロジェクトにアップロード

次のいずれかの方法を使用して、Apache PaimonプラグインをMaxComputeプロジェクトにアップロードできます。

MaxComputeクライアント (odpscmd) の使用

MaxComputeクライアント (odpscmd) でMaxComputeプロジェクトにアクセスし、次のコードを実行してpaimon_maxcompute_connector.jarパッケージをMaxComputeプロジェクトにアップロードします。

ADD JAR <path_to_paimon_maxcompute_connector.jar>;

DataWorksコンソールの使用

  1. DataWorksコンソールにログインします。 左側のナビゲーションウィンドウで、[ワークスペース] をクリックします。 [ワークスペース] ページで、目的のワークスペースを見つけ、[操作] 列の [ショートカット]> [データ開発] を選択します。

  2. DataStudioページで、[作成] をクリックし、[リソースの作成]> [JAR] を選択します。

  3. [リソースの作成] ダイアログボックスでパラメーターを設定し、paimon_maxcompute_connector.jarパッケージをアップロードし、[作成] をクリックします。 リソースの作成方法の詳細については、「手順1: リソースの作成または既存のリソースのアップロード」をご参照ください。

    image.png

  4. リソースが作成されたら、リソースの構成タブのツールバーのimage.pngアイコンをクリックして、リソースを開発環境にコミットします。

手順2: Realtime Compute for Apache Flinkを使用したApache Paimon外部テーブルの作成

このトピックのベストプラクティスは、Realtime Compute for Apache Flinkに基づいて実行されます。 Realtime Compute for Apache Flinkは、Apache PaimonファイルのデータをOSSに書き込みます。 Realtime Compute for Apache FlinkコンソールにApache Paimonカタログが作成され、MaxComputeがOSS内のApache Paimonファイルのデータを読み取るために使用できるApache PaimonテーブルがApache Paimonカタログに作成されます。 次に、MaxComputeはApache Paimonテーブルを外部テーブルとして使用して、OSSに保存されているApache Paimonデータを読み取ります。

  1. Realtime Compute for Apache Flinkコンソールにログインし、スクリプトを作成します。 スクリプトの作成方法の詳細については、「スクリプトの作成」をご参照ください。

  2. [スクリプト] タブの [スクリプト編集] セクションで、カタログコードとパラメーター値を入力し、コードを選択して [実行] をクリックします。

    CREATE CATALOG `<catalog name>` WITH (
     'type' = 'paimon',
      'metastore' = 'maxcompute',
      'warehouse' = '<warehouse>',
      'maxcompute.endpoint' = '<maxcompute.endpoint>',
      'maxcompute.project' = '<maxcompute.project>',
      'maxcompute.accessid' = '<maxcompute.accessid>',
      'maxcompute.accesskey' = '<maxcompute.accesskey>',
      'maxcompute.oss.endpoint' = '<maxcompute.oss.endpoint>',
      'fs.oss.endpoint' = '<fs.oss.endpoint>',
      'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
      'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
    );

    次の表に、コード内のパラメーターを示します。

    パラメーター

    必須 / 任意

    説明

    カタログ名

    Apache Paimonカタログの名前。 名前には文字のみを含めることができます。 このトピックでは、カタログ名はcatalognameです。

    type

    カタログのタイプ。 値をpaimonに設定します。

    メタストア

    メタデータストレージのタイプ。 値をmaxcomputeに設定します。

    倉庫

    OSSのデータウェアハウスディレクトリ。 このパラメーターの値は、oss://<bucket>/<object> 形式です。

    • bucket: 作成したOSSバケットの名前。

    • object: データが保存されるパス。

    OSSコンソールでバケット名とオブジェクト名を確認できます。

    maxcompute.endpoint

    MaxComputeサービスのエンドポイント。

    このパラメーターは、MaxComputeプロジェクトの作成時に選択したリージョンとネットワーク接続タイプに基づいて設定する必要があります。 異なるリージョンとネットワークタイプに対応するエンドポイントの詳細については、「エンドポイント」をご参照ください。

    maxcompute.project

    MaxCompute プロジェクトの名前を設定します。

    スキーマ機能が有効になっているMaxComputeプロジェクトはサポートされていません。

    maxcompute.accessid

    MaxComputeの権限を持つAlibaba CloudアカウントまたはRAMユーザーのAccessKey ID。

    AccessKeyペアページでAccessKey IDを取得できます。

    maxcompute.accesskey

    AccessKey IDに対応するAccessKeyシークレット。

    maxcompute.oss.endpoint

    不可

    MaxComputeがアクセスするOSSエンドポイント。 このパラメーターを設定しない場合、デフォルトでfs.oss.endpointパラメーターの値が使用されます。

    重要

    OSSバケットは、MaxComputeプロジェクトと同じリージョンにあります。 maxcompute.oss.endpointパラメーターを内部エンドポイントに設定することを推奨します。 各リージョンの異なるネットワークタイプのOSSエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。

    fs.oss.endpoint

    不可

    OSSのエンドポイント。

    このパラメーターは、warehouseパラメーターで指定されたOSSバケットがRealtime Compute for Apache Flinkワークスペースと同じリージョンにない場合、または別のAlibaba Cloudアカウント内のOSSバケットが使用されている場合に必要です。

    説明

    OSSバケットの作成時に選択したリージョンとネットワーク接続方法に基づいてエンドポイントを設定する必要があります。 異なるリージョンとネットワークタイプに対応するエンドポイントの詳細については、「リージョンとエンドポイント」をご参照ください。

    fs.oss.accessKeyId

    不可

    OSSの読み取りおよび書き込み権限を持つAlibaba CloudアカウントまたはRAMユーザーのAccessKey ID。

    このパラメーターは、warehouseパラメーターで指定されたOSSバケットがRealtime Compute for Apache Flinkワークスペースと同じリージョンにない場合、または別のAlibaba Cloudアカウント内のOSSバケットが使用されている場合に必要です。

    AccessKeyペアページでAccessKey IDを取得できます。

    fs.oss.accessKeySecret

    不可

    AccessKey IDに対応するAccessKeyシークレット。

    このパラメーターは、warehouseパラメーターで指定されたOSSバケットがRealtime Compute for Apache Flinkワークスペースと同じリージョンにない場合、または別のAlibaba Cloudアカウント内のOSSバケットが使用されている場合に必要です。

  3. Apache Paimonテーブルを作成します。

    1. test_tblという名前のテーブルを作成します。

      [スクリプト] タブのスクリプト編集セクションで、次のステートメントを実行し、実行が完了したことを示すメッセージが [結果] タブに表示されるまで待ちます。 この例では、test_tblという名前のテーブルが作成されます。

      CREATE TABLE `catalogname`.`default`.test_tbl (
       dt STRING,
       id BIGINT,
       data STRING,
       PRIMARY KEY (dt, id) NOT ENFORCED
      ) PARTITIONED BY (dt);
    2. テーブルtest_tblにデータを書き込みます。

      [SQLエディター] ページの [ドラフト] タブで、次のステートメントを含むSQLドラフトを作成します。 次に、ドラフトを展開します。 SQLドラフトの作成と配置方法の詳細については、「SQLドラフトの作成」をご参照ください。

      -- In this example, the execution.checkpointing.interval parameter is set to 10s. This increases the speed of committing data. 
      SET 'execution.checkpointing.interval' = '10s';
      
      INSERT INTO `catalogname`.`default`.test_tbl VALUES ('2023-04-21', 1, 'AAA'), ('2023-04-21', 2, 'BBB'), ('2023-04-22', 1, 'CCC'), ('2023-04-22', 2, 'DDD');
      説明
      • Apache Paimon結果テーブルは、チェックポイントが完了するたびにデータをコミットします。

      • 本番環境では、チェックポイント間隔とチェックポイント間の最小間隔は、レイテンシのビジネス要件によって異なります。 ほとんどの場合、それらは1〜10分に設定される。

      • SQLドラフトのエンジンバージョンはvvr-8.0.5-flink-1.17以降である必要があります。

ステップ3: MaxComputeを使用してApache Paimon外部テーブルからデータを読み取る

  1. MaxComputeクライアント (odpscmd) で、またはMaxCompute SQLステートメントを実行できる別のツールを使用して、次のコマンドを実行します。

    SET odps.sql.common.table.planner.ext.hive.bridge = true;
    SET odps.sql.hive.compatible = true;
  2. 次のコマンドを実行して、Apache Paimon外部テーブルtest_tblからデータを照会します。

    SELECT * FROM test_tbl WHERE dt = '2023-04-21';

    次の応答が返されます。

    +------------+------------+------------+
    | id | data | dt |
    +------------+------------+------------+
    | 1 | AAA | 2023-04-21 |
    | 2 | BBB | 2023-04-21 |
    +------------+------------+------------+