すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:基本的な使い方

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)クラスターで Iceberg を使用する方法について説明します。

背景情報

このトピックでは、メタデータは Data Lake Formation(DLF)を使用して管理されます。 メタデータ設定の詳細については、DLF メタデータの設定をご参照ください。

前提条件

EMR V5.3.0 以降のマイナーバージョンの Hadoop クラスターが作成されていること。 詳細については、クラスターの作成をご参照ください。

制限事項

Iceberg の Spark SQL 拡張機能は、Spark 2.4 ではサポートされていません。 EMR V3.38.X 以降のマイナーバージョンのクラスターを使用する場合は、Spark DataFrame API のみを使用して Iceberg 関連の操作を実行できます。 このトピックでは、EMR V5.3.0 以降のマイナーバージョンのクラスターで Spark SQL を使用して Iceberg 関連の操作を実行する方法について説明します。

手順

  1. SSH モードでクラスターにログオンします。 詳細については、クラスターへのログオンをご参照ください。

  2. 次のコマンドを実行して、Iceberg 関連のパラメーターを設定します。

    Spark SQL を使用して Iceberg 関連の操作を実行するには、カタログを設定する必要があります。 次のコマンドは、カタログを設定する方法を示しています。 カタログのデフォルト名と設定する必要のあるパラメーターは、クラスターのバージョンによって異なります。 詳細については、DLF メタデータの設定をご参照ください。 次の設定では、DLF を使用してメタデータを管理します。

    説明

    カタログ設定には、spark.sql.catalog.<catalog_name> というプレフィックスが付きます。 <catalog_name> は、カタログの名前を示します。

    • EMR V5.6.0 以降のマイナーバージョン

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR V5.5.X

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      説明

      spark.sql.catalog.dlf.warehouse パラメーターは空のままにすることができます。 このパラメーターを設定しない場合は、デフォルトのウェアハウスパスが使用されます。

    • EMR V5.3.X から EMR V5.4.X

      AccessKey ペアを設定する必要があります。 ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数と ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認してください。 詳細については、Linux、macOS、および Windows での環境変数の設定をご参照ください。

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<ALIBABA_CLOUD_ACCESS_KEY_ID> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>

    出力に次の情報が含まれている場合、Spark SQL CLI が起動されます。

    spark-sql>
  3. 基本的な操作を実行します。

    重要

    次の例では、<catalog_name> はカタログの名前を指定します。 EMR V5.6.0 以降のマイナーバージョンのクラスターの場合、カタログ名は iceberg です。 その他のバージョンのクラスターについては、手順 2 で説明されている情報をご参照ください。 カタログ設定には、spark.sql.catalog.<catalog_name> というプレフィックスが付きます。

    • データベースを作成する

      CREATE DATABASE IF NOT EXISTS <catalog_name>.iceberg_db;
    • テーブルを作成する

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;

      Iceberg テーブルを作成するために使用されるステートメントには、COMMENT、PARTITIONED BY、LOCATION、および TBLPROPERTIES 句を含めることができます。 次のコードは、TBLPROPERTIES 句を使用してテーブルレベルのプロパティを設定する方法を示しています。

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • データを挿入する

      INSERT INTO <catalog_name>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • データをクエリする

      SELECT * FROM <catalog_name>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM <catalog_name>.iceberg_db.sample GROUP BY data;
    • データを更新する

      UPDATE <catalog_name>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • データを削除する

      DELETE FROM <catalog_name>.iceberg_db.sample WHERE id = 3;