すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Paimon の使用

最終更新日:Feb 05, 2026

Apache Paimon は、ストリーミング処理とバッチ処理のための統合レイクストレージフォーマットです。高スループットの書き込みと低レイテンシーのクエリをサポートします。このトピックでは、EMR Serverless Spark で Paimon テーブルからデータを読み書きする方法について説明します。

前提条件

ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

操作手順

ステップ 1:SQL セッションの作成

  1. セッションページに移動します。

    1. EMR コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、EMR Serverless > Spark を選択します。

    3. Spark ページで、管理するワークスペースの名前をクリックします。

    4. EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[オペレーションセンター] > セッション を選択します。

  2. SQL セッション タブで、SQL セッションの作成 をクリックします。

  3. Create SQL Session ページで、Spark Configuration セクションで次のパラメーターを設定し、Create をクリックします。詳細については、「SQL セッションの管理」をご参照ください。

    Spark はカタログを使用して Paimon からのデータの読み書きを行います。カタログを選択する必要があります。カタログの詳細については、「データカタログの管理」をご参照ください。

    データカタログの使用

    データカタログを使用する場合、セッションでパラメーターを設定する必要はありません。Catalogs ページで Add Catalog をクリックし、SparkSQL 開発用のデータカタログを選択します。

    説明

    この機能には、EMR エンジンバージョン esr-4.3.0 以降、esr-3.3.0 以降、または esr-2.7.0 以降が必要です。

    カスタムカタログの使用

    DLF (旧 DLF 2.5)

    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>

    パラメーターは次のとおりです。

    パラメーター

    説明

    spark.sql.catalog.<catalogName>

    カタログの実装。

    静的フィールド:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    メタストアのタイプを指定します。値を rest に設定して、DLF REST API を使用します。

    静的フィールド:rest

    spark.sql.catalog.<catalogName>.uri

    DLF の URI を指定します。フォーマットは http://<endpoint>-vpc.dlf.aliyuncs.com です。

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    spark.sql.catalog.<catalogName>.warehouse

    データストレージパス (ウェアハウスパス) を指定します。DLF の場合は、カタログ名を指定します。

    <catalog_name>

    spark.sql.catalog.<catalogName>.token.provider

    認証プロバイダーを指定します。DLF は dlf を使用します。

    静的フィールド:dlf

    spark.sql.catalog.<catalogName>.dlf.access-key-id

    ご利用の Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。

    <access_key_id>

    spark.sql.catalog.<catalogName>.dlf.access-key-secret

    ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

    <access_key_secret>

    DLF-Legacy (旧 DLF 1.0)

    メタデータは DLF-Legacy (旧 DLF 1.0) に保存されます。

    spark.sql.catalog.<catalogName>                          org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id           <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint     dlf-vpc.cn-hangzhou.aliyuncs.com

    パラメーターは次の表のとおりです。

    パラメーター

    説明

    spark.sql.catalog.<catalogname>

    カタログの実装。

    静的フィールド:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogname>.metastore

    メタストアのタイプを指定します。値を dlf に設定して、Alibaba Cloud DLF をメタストアとして使用します。

    静的フィールドは dlf です。

    spark.sql.catalog.<catalogName>.dlf.catalog.id

    DLF のカタログ名を指定します。

    <catalog_name>

    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint

    DLF のエンドポイントを指定します。ご利用のリージョンに適した DLF エンドポイントを選択してください。

    dlf-vpc.cn-hangzhou.aliyuncs.com

    Hive メタストア

    メタデータは指定された Hive メタストアに保存されます。

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       hive
    spark.sql.catalog.<catalogName>.uri             thrift://<yourHMSUri>:<port>

    パラメーターは次の表のとおりです。

    パラメーター

    説明

    spark.sql.catalog.<catalogName>

    カタログの実装。

    静的フィールド:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    メタストアのタイプを指定します。値を hive に設定して、Hive メタストアを使用します。

    静的フィールド:hive

    spark.sql.catalog.<catalogName>.uri

    Hive メタストアの URI。フォーマットは thrift://<Hive メタストアの IP アドレス>:9083 です。

    <Hive メタストアの IP アドレス> は、HMS サービスのプライベート IP アドレスです。外部メタストアサービスを指定するには、「外部 Hive メタストアサービスへの接続」をご参照ください。

    thrift://192.168.**.**:9083

    ファイルシステム

    メタデータはファイルシステムに保存されます。

    spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore       filesystem
    spark.sql.catalog.<catalogName>.warehouse       oss://<yourBucketName>/warehouse

    パラメーターは次の表のとおりです。

    パラメーター

    説明

    spark.sql.catalog.<catalogName>

    カタログの実装。

    静的フィールド:org.apache.paimon.spark.SparkCatalog

    spark.sql.catalog.<catalogName>.metastore

    メタストアのタイプを指定します。値を filesystem に設定して、ファイルシステムをメタストアとして使用します。

    静的フィールド:filesystem

    spark.sql.catalog.<catalogName>.warehouse

    メタデータストレージパス (ウェアハウスパス) を指定します。コード内の <yourBucketName> は、OSS のバケット名です。

    oss://my-bucket/warehouse

    DLF、DLF-Legacy、Hive など、複数のカタログを同時に設定することもできます。次のコードは一例です。

    # DLF カタログの設定
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       rest
    spark.sql.catalog.<catalogName>.uri                             http://cn-hangzhou-vpc.dlf.aliyuncs.com
    spark.sql.catalog.<catalogName>.warehouse                       <catalog_name>
    spark.sql.catalog.<catalogName>.token.provider                  dlf
    spark.sql.catalog.<catalogName>.dlf.access-key-id               <access_key_id>
    spark.sql.catalog.<catalogName>.dlf.access-key-secret           <access_key_secret>
    
    # dlf-legacy カタログの設定
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       dlf
    spark.sql.catalog.<catalogName>.dlf.catalog.id                  <catalog_name>
    spark.sql.catalog.<catalogName>.dlf.catalog.endpoint            dlf-vpc.cn-hangzhou.aliyuncs.com
    
    
    # hive1 カタログの設定
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-1>:<port>
    
    # hive2 カタログの設定
    spark.sql.catalog.<catalogName>                                 org.apache.paimon.spark.SparkCatalog
    spark.sql.catalog.<catalogName>.metastore                       hive
    spark.sql.catalog.<catalogName>.uri                             thrift://<yourHMSUri-2>:<port>

ステップ 2:Paimon カタログに基づくテーブルの読み書き

  1. SQL 開発ページに移動します。

    EMR Serverless Spark ページの左側のナビゲーションウィンドウで Data Development をクリックします。

  2. 開発 タブで、image アイコンをクリックします。

  3. 新規作成 ダイアログボックスで、users_task などの名前を入力し、タイプはデフォルトの SparkSQL のままにして、OK をクリックします。

  4. 新しい Spark SQL タブ (users_task) に次のコードをコピーします。

    Paimon カタログの使用

    -- データベースを作成します。
    CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;             
    
    -- Paimon テーブルを作成します。
    CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    
    -- Paimon テーブルにデータを書き込みます。
    INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"), (3, "c");
    
    -- Paimon テーブルから書き込み結果をクエリします。
    SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- データベースを削除します。
    DROP DATABASE paimon.ss_paimon_db CASCADE;
  5. ドロップダウンリストから、データベースと作成した SQL セッションを選択します。

  6. Run をクリックしてタスクを実行します。次の情報が返されます。

    image

よくある質問

テーブルに対して DELETEUPDATE、または MERGE 操作を実行するとエラーが発生する場合はどうすればよいですか?

  • 現象:DELETEUPDATE、または MERGE 操作を実行すると、次のようなエラーメッセージが表示されます。

    Caused by: org.apache.spark.sql.AnalysisException: Table does not support deletes/updates/merge: <tableName>.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportError(QueryCompilationErrors.scala:1391)
  • 原因:テーブルのストレージフォーマットが行レベルの更新操作をサポートしていないか、必要な Spark 構成がありません。

  • 解決策:

    1. テーブルタイプを確認します。

      次のコマンドを実行して、テーブルが Paimon テーブルであるかどうかを確認します。

      SHOW CREATE TABLE <tableName>;

      出力に USING PAIMON が含まれている場合、そのテーブルは Paimon テーブルです。出力が USING hive などの別のストレージフォーマットを示している場合は、そのフォーマットが行レベルの更新操作をサポートしているかどうかを確認してください。

    2. Spark 構成を確認します。

      テーブルが Paimon テーブルの場合、Spark Configuration セクションで、Paimon のサポートを有効にするために次の構成が追加されていることを確認します。

      spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

      この構成が追加されていない場合は、Spark Configuration セクションに追加してください。

参考文献