すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:DLF の使用

最終更新日:Nov 09, 2025

この Topic では、Data Lake Formation (DLF) を使用して、Paimon テーブルにデータを書き込む EMR Serverless Spark タスクを実行する方法について説明します。テストファイルをアップロードし、タスクを作成して実行し、ログまたはコンソールで結果を表示して、データが正しく書き込まれ、クエリできることを確認します。

前提条件

  • DLF データカタログを作成済みであること。 詳細については、「データカタログ」をご参照ください。

  • DLF を使用するワークスペースを作成済みであること。 詳細については、「ワークスペースの作成」をご参照ください。

手順

ステップ 1: テストファイルの準備

単純な DataFrame を作成し、pyspark_test という名前の Paimon テーブルに書き込みます。次に、Spark SQL を使用してテーブルデータをクエリし、データが正しく書き込まれていることを確認します。

Java

次のセクションでは、Java コードの例を示します。SparkExample-1.0-SNAPSHOT.jar をクリックして、テスト JAR パッケージをダウンロードします。

Maven 依存関係

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>

コード例

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class DlfAccess {
    public static void main(String[] args) {
        // SparkSession インスタンスを作成します。
        SparkSession spark = SparkSession.builder()
                .appName("DLF Example")
                .enableHiveSupport()
                .getOrCreate();

        // DataFrame を構築します。
        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob"),
                RowFactory.create(3, "Charlie")
        );

        StructType schema = DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id",   DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType,  false)
        });
        // DataFrame を作成し、テーブルに書き込みます。

        Dataset<Row> df = spark.createDataFrame(data, schema);

        spark.sql("drop table if exists pyspark_test");
        df.write().format("paimon").mode("overwrite").saveAsTable("pyspark_test");

        // クエリを実行して、最初の 10 レコードを取得します。
        Dataset<Row> tableDf = spark.sql("select * from pyspark_test limit 10");

        tableDf.show();

        spark.stop();
    }
}

Python

ワークスペースが DLF 1.0 (レガシー) にアタッチされている場合は、Spark 構成に spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions パラメーターを追加します。

コード例

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DLF test") \
    .enableHiveSupport() \
    .getOrCreate()

# テストデータ
data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]

spark.sql("drop table if exists pyspark_test")
# DataFrame を作成し、テーブルに書き込みます。
df = spark.createDataFrame(data, schema='id int, name string')
df.write.format('paimon').mode("overwrite").saveAsTable("pyspark_test")

# テーブルをクエリして検証します。
spark.sql("select * from pyspark_test").show()

ステップ 2: テストファイルのアップロード

  1. リソースアップロードページに移動します。

    1. EMR コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[EMR Serverless] > [Spark] を選択します。

    3. [Spark] ページで、ターゲットワークスペースの名前をクリックします。

    4. 左側のナビゲーションウィンドウで、[ファイル管理] をクリックします。

  2. [ファイル管理] ページで、[ファイルのアップロード] をクリックします。

  3. [ファイルのアップロード] ダイアログボックスで、アップロードエリアをクリックして Python ファイルまたは JAR パッケージを選択するか、ファイルをアップロードエリアにドラッグします。

ステップ 3: バッチタスクの作成と実行

  1. 左側のナビゲーションウィンドウで、[データ開発] をクリックします。

  2. [開発フォルダー] タブで、image (新規) アイコンをクリックします。

  3. ダイアログボックスで、[名前] を入力します。[バッチタスク] セクションで、PySpark または JAR を選択します。次に、[OK] をクリックします。

  4. 右上隅で、ターゲットキューを選択します。

    キューの追加方法の詳細については、「リソースキューの管理」をご参照ください。

  5. 新しい開発タブで、次のパラメーターを構成し、他のパラメーターはデフォルト設定のままにして、[実行] をクリックします。

    JAR

    パラメーター

    説明

    メイン JAR リソース

    アップロードした JAR パッケージを選択します。

    メインクラス

    Spark タスクを送信するときに指定するメインクラス。この例では、org.example.DlfAccess と入力します。

    PySpark

    パラメーター

    説明

    メイン Python リソース

    [ワークスペースリソース] を選択し、[リソースアップロード] ページでアップロードした Python ファイルを選択します。

ステップ 4: 結果の表示

タスクが完了したら、次のいずれかの方法で結果を表示できます。

方法 1: ログクエリで結果を直接表示する

  1. タスクの実行後、[実行履歴] セクションで、タスクのアクション列にある [ログクエリ] をクリックします。

  2. [ログクエリ] タブで、ログ情報を表示できます。

    image

方法 2: Data Lake Formation コンソールで結果を表示する

  1. DLF コンソールにログインします

  2. 対応するカタログとデータベースに移動して、テストテーブルを表示します。

方法 3: SQL 開発を使用して結果を表示する

[データ開発] で、SQL 開発タスクを作成し、クエリを実行してデータが正しく書き込まれたことを確認します。詳細については、「SparkSQL 開発」をご参照ください。

image