すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark と Paimon の統合

最終更新日:Feb 18, 2025

Apache Paimon を使用して、データレイクストレージサービスを Hadoop 分散ファイルシステム (HDFS) または Alibaba Cloud Object Storage Service (OSS) に効率的にデプロイし、Spark コンピュートエンジンを使用してデータレイク分析を実行できます。このトピックでは、Spark SQL を使用して EMR の Paimon からデータを読み書きする方法について説明します。

前提条件

DataLake またはカスタムクラスタが作成され、クラスタの作成時に Spark および Paimon サービスが選択されています。詳細については、「クラスタを作成する」をご参照ください。

制限事項

  • EMR V3.46.0、EMR V5.12.0、または EMR V3.46.0 または EMR V5.12.0 より後のマイナーバージョンのクラスタのみで、Spark SQL を使用して Paimon からデータを読み書きできます。

  • Spark 3 の Spark SQL のみで、カタログを使用して Paimon からデータを読み書きできます。

手順

手順 1:カタログを構成する

Spark は、カタログを使用して Paimon テーブルからデータを読み書きできます。Paimon カタログと spark_catalog がサポートされています。ビジネスシナリオに基づいて、Paimon カタログまたは spark_catalog を使用するかどうかを決定します。

  • Paimon カタログ:Paimon 形式のメタデータを管理するために使用されます。Paimon テーブルからのデータのクエリとデータの書き込みにのみ使用できます。

  • spark_catalog:Spark のデフォルトの組み込みカタログです。通常、Spark SQL 内部テーブルのメタデータを管理するために使用されます。Paimon テーブルまたは Paimon 以外のテーブルからのデータのクエリとデータの書き込みに使用できます。

Paimon カタログを使用する

HDFS などのファイルシステム、または OSS などのオブジェクトストレージシステムにメタデータを保存できます。また、他のサービスによる Paimon へのアクセスを容易にするために、メタデータを DLF および Hive に同期することもできます。

ストレージに使用されるルートパスは、spark.sql.catalog.paimon.warehouse パラメーターによって指定されます。指定されたルートパスが存在しない場合は、自動的に作成されます。指定されたルートパスが存在する場合は、カタログを使用してパス内の既存のテーブルにアクセスできます。

  1. SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタにログオンする」をご参照ください。

  2. メタデータストレージタイプに基づいて構成するカタログのタイプを選択し、指定されたコマンドを実行して Spark SQL を起動します。

    ファイルシステムカタログを構成する

    ファイルシステムカタログは、ファイルシステムまたはオブジェクトストレージシステムにメタデータを保存します。

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=filesystem \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    説明
    • spark.sql.catalog.paimon:paimon という名前のカタログを定義します。

    • spark.sql.catalog.paimon.metastore:カタログで使用されるメタデータストレージタイプを指定します。このパラメーターを filesystem に設定すると、メタデータはオンプレミスのファイルシステムに保存されます。

    • spark.sql.catalog.paimon.warehouse:データウェアハウスの実際の場所を指定します。ビジネス要件に基づいてこのパラメーターを構成します。<yourBucketName> を OSS バケットの名前に置き換えます。OSS バケットの作成方法の詳細については、「バケットを作成する」をご参照ください。

    DLF カタログを構成する

    DLF カタログは、メタデータを DLF に同期できます。

    重要

    EMR クラスタを作成するときは、[メタデータ][DLF 統合メタデータ] を選択する必要があります。

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=dlf \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    説明
    • spark.sql.catalog.paimon:paimon という名前のカタログを定義します。

    • spark.sql.catalog.paimon.metastore:カタログで使用されるメタデータストレージタイプを指定します。このパラメーターを dlf に設定すると、メタデータは DLF に同期されます。

    • spark.sql.catalog.paimon.warehouse:データウェアハウスの実際の場所を指定します。ビジネス要件に基づいてこのパラメーターを構成します。<yourBucketName> を OSS バケットの名前に置き換えます。OSS バケットの作成方法の詳細については、「バケットを作成する」をご参照ください。

    Hive カタログを構成する

    Hive カタログは、メタデータを Hive Metastore に同期できます。Hive を使用すると、Hive カタログで作成されたテーブルのデータをクエリできます。Hive で Paimon のデータをクエリする方法については、「Hive と Paimon の統合」をご参照ください。

    spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
    --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    説明
    • spark.sql.catalog.paimon:paimon という名前のカタログを定義します。

    • spark.sql.catalog.paimon.metastore:カタログで使用されるメタデータストレージタイプを指定します。このパラメーターを hive に設定すると、メタデータは Hive Metastore に同期されます。

    • spark.sql.catalog.paimon.uri:Hive Metastore のアドレスとポート番号を指定します。このパラメーターを thrift://master-1-1:9083 に設定すると、Spark SQL クライアントは master-1-1 ノードで実行され、リッスンポートが 9083 である Hive Metastore に接続してメタデータ情報を取得します。

    • spark.sql.catalog.paimon.warehouse:データウェアハウスの実際の場所を指定します。ビジネス要件に基づいてこのパラメーターを構成します。<yourBucketName> を OSS バケットの名前に置き換えます。OSS バケットの作成方法の詳細については、「バケットを作成する」をご参照ください。

spark_catalog を使用する

  1. SSH モードでクラスタのマスターノードにログオンします。詳細については、「クラスタにログオンする」をご参照ください。

  2. 次のコマンドを実行してカタログを構成し、Spark SQL を起動します。

    spark-sql --conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
    説明
    • spark.sql.catalog.spark_catalog:spark_catalog という名前のカタログを定義します。

    • spark_catalog でのストレージに使用されるルートパスは、spark.sql.warehouse.dir パラメーターによって指定され、ほとんどの場合変更する必要はありません。

手順 2:Paimon テーブルからデータを読み書きする

次の Spark SQL 文を実行して、構成されたカタログに Paimon テーブルを作成し、テーブルからデータを読み書きします。

Paimon カタログを使用する

Paimon テーブルにアクセスするには、paimon.<db_name>.<tbl_name> 形式でテーブル名を指定する必要があります。ここで、<db_name> はデータベース名、<tbl_name> はテーブル名を示します。

-- データベースを作成します。
CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;

-- Paimon テーブルを作成します。
CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;

-- Paimon テーブルにデータを書き込みます。
INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana"), (3, "cherry");

-- Paimon テーブルからデータをクエリします。
SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;

-- データベースを削除します。
DROP DATABASE paimon.ss_paimon_db CASCADE;
説明

Hive カタログを構成した後にデータベースを作成するときにエラー metastore: Failed to connect to the MetaStore Server が報告された場合、Hive Metastore サービスは開始されていません。この場合、次のコマンドを実行して Hive Metastore を起動する必要があります。Hive Metastore が起動したら、コマンドを再実行して Hive カタログを構成します。

hive --service metastore &

クラスタを作成するときに [メタデータ] を [DLF 統合メタデータ] に設定した場合は、メタデータを DLF に同期し、DLF カタログを構成することをお勧めします。

spark_catalog を使用する

Paimon テーブルまたは Paimon 以外のテーブルにアクセスするかどうかに関係なく、spark_catalog.<db_name>.<tbl_name> 形式でテーブル名を指定できます。spark_catalog は Spark のデフォルトの組み込みカタログです。したがって、spark_catalog を省略し、テーブルアクセスに <db_name>.<tbl_name> 形式でテーブル名を直接指定できます。<db_name> はデータベース名、<tbl_name> はテーブル名を示します。

-- データベースを作成します。
CREATE DATABASE IF NOT EXISTS ss_paimon_db;
CREATE DATABASE IF NOT EXISTS ss_parquet_db;

-- Paimon テーブルと Parquet テーブルを作成します。
CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "cherry";

-- Paimon テーブルにデータを書き込みます。
INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "apple"), (2, "banana");
INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;

-- Paimon テーブルからデータをクエリします。
SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;

-- データベースを削除します。
DROP DATABASE ss_paimon_db CASCADE;
DROP DATABASE ss_parquet_db CASCADE;

次の結果が返されます。

1       apple   
2       banana
3       cherry 

よくある質問

Paimon コンポーネントをクラスタに追加した後、パラメーター構成 spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions はクラスタに自動的に追加されますか?

はい、構成は自動的に追加されます。Paimon コンポーネントがクラスタに追加された後、次の操作を実行して構成情報を表示できます。

  1. クラスタの [サービス] タブに移動します。

  2. Spark サービスの構成を表示します。

    1. Spark サービスの [構成] をクリックします。

    2. [設定項目名]spark.sql.extensions を検索して、構成情報を表示します。

      image

Spark Shell を使用して Paimon からデータを読み書きできますか?

はい、できます。Spark Shell を使用して Paimon からデータを読み書きするには、次の手順を実行します。

  1. 次のコマンドを実行して Spark Shell を起動します。

    spark-shell
  2. Spark Shell で次の Scala コードを実行して、指定されたディレクトリに保存されている Paimon テーブルにデータを書き込み、データからクエリします。

    val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("INSERT INTO test_tbl VALUES (4, 'apple1', 3.5), (5, 'banana1', 4.0), (6, 'cherry1', 20.5)")
    spark.sql("SELECT * FROM test_tbl").show()
    説明
    • paimon:固定値。この値は、データを読み書きするためのデータストレージ形式として Paimon を使用することを示します。

    • oss://<yourBucketName>/warehouse/test_db.db/test_tbl:Paimon テーブルが保存されているパス。ビジネス要件に基づいてパスを置き換えます。<yourBucketName> を OSS バケットの名前に置き換えます。

参照資料

Paimon の詳細については、「Apache Paimon」を参照してください。