Iceberg は、データレイク向けのオープンなテーブルフォーマットです。Hadoop 分散ファイルシステム (HDFS) または Alibaba Cloud Object Storage Service (OSS) 上で、独自のデータレイクストレージサービスを迅速に構築できます。本トピックでは、EMR Serverless Spark を使用して Iceberg テーブルからデータを読み取り、および Iceberg テーブルへデータを書き込む方法について説明します。
前提条件
ワークスペースが作成されました。詳細については、「ワークスペースの作成」をご参照ください。
操作手順
Iceberg テーブルからのデータ読み取りおよび Iceberg テーブルへのデータ書き込みには、Spark SQL ジョブまたはノートブックを使用できます。本トピックでは、Spark SQL ジョブを例として説明します。
ステップ 1:セッションの作成
[セッション] ページに移動します。
EMR コンソール にログインします。
左側ナビゲーションウィンドウで、 を選択します。
「[Spark] ページ」で、管理するワークスペースの名前をクリックします。
左側のナビゲーションウィンドウで、EMR Serverless Spark ページの [Operation Center] > [Sessions] を選択します。
「SQL セッション」タブで、「SQL セッションの作成」をクリックします。
[SQL セッションの作成] ページで、[Spark 設定] セクションで以下の設定を行い、Create をクリックします。 詳細については、「SQL セッションの管理」をご参照ください。
Spark はカタログを使用して Iceberg テーブルから読み取りおよび Iceberg テーブルへ書き込みます。ご使用のユースケースに応じて、適切なカタログを選択してください。カタログの詳細については、「データカタログの管理」をご参照ください。
データカタログの使用
データカタログを使用する場合、セッション内でパラメーターを設定する必要はありません。代わりに、Catalogs ページに移動し、Add Catalog をクリックします。その後、Spark SQL 開発時に直接データカタログを選択できます。
説明DLF(旧 DLF 2.5)の Iceberg テーブルにアクセスするには、エンジンバージョン esr-4.7.0、esr-3.6.0 以降を使用してください。
DLF-Legacy(旧 DLF 1.0)または Hive Metastore の Iceberg テーブルにアクセスするには、エンジンバージョン esr-4.3.0、esr-3.3.0、esr-2.7.0 以降を使用してください。
カスタムカタログの使用
DLF(旧 DLF 2.5)
説明エンジンバージョンは esr-4.7.0、esr-3.6.0 以降である必要があります。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog spark.sql.catalog.iceberg_catalog.uri http://<regionID>-vpc.dlf.aliyuncs.com spark.sql.catalog.iceberg_catalog.warehouse <catalog_name> spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region <regionID> spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext spark.sql.catalog.iceberg_catalog.rest.access-key-id <access_key_id> spark.sql.catalog.iceberg_catalog.rest.secret-access-key <access_key_secret>以下の表に、各パラメーターの説明を示します。
パラメーター
説明
例
spark.sql.extensionsIceberg Spark 拡張機能を有効化します。
固定値:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.iceberg_catalogiceberg_catalog という名前の Spark カタログを登録します。
固定値:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.iceberg_catalog.catalog-implIceberg REST Catalog を基盤となるカタログ実装として指定します。
org.apache.iceberg.rest.RESTCatalogspark.sql.catalog.iceberg_catalog.uriDLF Iceberg サービスの REST API エンドポイント。形式:
http://<regionID>-vpc.dlf.aliyuncs.com。http://cn-hangzhou-vpc.dlf.aliyuncs.comspark.sql.catalog.iceberg_catalog.warehouse関連付けられた DLF カタログの名前を指定します。
説明データ共有によって作成された DLF カタログを関連付けることは推奨されません。
<catalog_name>spark.sql.catalog.iceberg_catalog.io-implDLF 固有の FileIO 実装を使用します。
固定値:
org.apache.iceberg.rest.DlfFileIOspark.sql.catalog.iceberg_catalog.rest.auth.typeREST リクエストの検証に AWS SigV4 署名認証を有効化します。
sigv4spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-typeデリゲート認証を無効化します。クライアントが署名用に AccessKey ID および AccessKey Secret を直接提供します。
nonespark.sql.catalog.iceberg_catalog.rest.signing-region署名に使用するリージョンを指定します。これは DLF サービスが実行されるリージョンと一致している必要があります。
cn-hangzhouspark.sql.catalog.iceberg_catalog.rest.signing-name署名に使用するサービス名を指定します。
固定値:
DlfNextspark.sql.catalog.iceberg_catalog.rest.access-key-idAlibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID。
<access_key_id>spark.sql.catalog.iceberg_catalog.rest.secret-access-keyAlibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。
<access_key_secret>DLF-Legacy(旧 DLF 1.0)
説明エンジンバージョンは esr-4.3.0、esr-3.3.0、esr-2.7.0 以降である必要があります。
メタデータは DLF-Legacy(旧 DLF 1.0)に格納されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.aliyun.dlf.hive.DlfCatalog spark.sql.catalog.<catalogName>.dlf.catalog.id <catalog_name>以下の表に、各パラメーターの説明を示します。
パラメーター
説明
例
spark.sql.extensionsIceberg Spark 拡張機能を有効化します。
固定値:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName><catalogName>という名前のカタログを登録します。固定値:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-implDLF-Legacy のグローバルメタサービスに直接接続するため、Alibaba Cloud DLF-Legacy 専用の Hive 互換実装を使用します。
固定値:
org.apache.iceberg.aliyun.dlf.hive.DlfCatalogspark.sql.catalog.<catalogName>.dlf.catalog.id関連付けられた DLF カタログの名前を指定します。
<catalog_name>Hive Metastore
メタデータは指定された Hive Metastore に格納されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.hive.HiveCatalog spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>以下の表に、各パラメーターの説明を示します。
パラメーター
説明
例
spark.sql.extensionsIceberg Spark 拡張機能を有効化します。
固定値:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName><catalogName>という名前のカタログを登録します。固定値:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.catalog-implHive Metastore を介して Iceberg テーブルのメタデータを格納および取得するために、Iceberg 公式の HiveCatalog 実装を指定します。
固定値:
org.apache.iceberg.hive.HiveCatalogspark.sql.catalog.<catalogName>.uriHive Metastore の URI。形式:
thrift://<Hive Metastore の IP アドレス>:9083。<Hive Metastore の IP アドレス>は Hive Metastore サービスの内部 IP アドレスです。外部 Metastore サービスを指定する方法については、「外部 Hive Metastore サービスへの接続」をご参照ください。thrift://192.168.**.**:9083ファイルシステム
メタデータはファイルシステムに格納されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.type hadoop spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouse以下の表に、各パラメーターの説明を示します。
パラメーター
説明
例
spark.sql.extensionsIceberg Spark 拡張機能を有効化します。
固定値:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsspark.sql.catalog.<catalogName><catalogName>という名前のカタログを登録します。固定値:
org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.<catalogName>.typeカタログタイプを
hadoopに設定します。これは、Hive Metastore を経由せず、HadoopCatalog を使用してメタデータをファイルシステムに直接格納することを意味します。hadoopspark.sql.catalog.<catalogName>.warehouseメタデータの保存パスを指定します。このコードでは、
<yourBucketName>はご利用の OSS バケット名です。oss://<yourBucketName>/warehouse
ステップ 2:Iceberg テーブルからの読み取りおよび Iceberg テーブルへの書き込み
SQL 開発ページに移動できます。
[EMR Serverless Spark] ページで、左側のナビゲーションウィンドウの Data Development をクリックします。
[開発] タブで、
アイコンをクリックします。[新規] ダイアログボックスで、users_task などの名前を入力し、タイプはデフォルトの [SparkSQL] のままにして、[OK] をクリックします。
以下のコードを新しい SparkSQL タブ(users_task)にコピーできます。
説明データベースを指定しない場合、テーブルはカタログのデフォルトデータベースに作成されます。別のデータベースを作成して指定することも可能です。
-- データベースの作成 CREATE DATABASE IF NOT EXISTS iceberg_catalog.db; -- パーティションなしテーブルの作成 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT '一意の ID', data STRING ) USING iceberg; -- パーティションなしテーブルへのデータ挿入 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- パーティションなしテーブルからの全データのクエリ SELECT * FROM iceberg_catalog.db.tbl; -- パーティションなしテーブルからの条件付きデータのクエリ SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- パーティションなしテーブルでのデータ更新 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- 更新の確認 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- パーティションなしテーブルからのデータ削除 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- 削除の確認 SELECT * FROM iceberg_catalog.db.tbl; -- パーティション付きテーブルの作成 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP, dt DATE ) USING iceberg PARTITIONED BY (dt, category); -- パーティション付きテーブルへのデータ挿入 INSERT INTO iceberg_catalog.db.part_tbl VALUES (1 , 'data-01', 'A', timestamp'2026-01-01 10:00:00', date'2026-01-01'), (2 , 'data-02', 'A', timestamp'2026-01-01 11:00:00', date'2026-01-01'), (3 , 'data-03', 'A', timestamp'2026-01-02 09:30:00', date'2026-01-02'), (4 , 'data-04', 'B', timestamp'2026-01-02 12:15:00', date'2026-01-02'), (5 , 'data-05', 'B', timestamp'2026-01-03 08:05:00', date'2026-01-03'), (6 , 'data-06', 'B', timestamp'2026-01-03 14:20:00', date'2026-01-03'), (7 , 'data-07', 'C', timestamp'2026-01-04 16:45:00', date'2026-01-04'), (8 , 'data-08', 'C', timestamp'2026-01-04 18:10:00', date'2026-01-04'), (9 , 'data-09', 'C', timestamp'2026-01-05 07:55:00', date'2026-01-05'), (10, 'data-10', 'A', timestamp'2026-01-05 13:35:00', date'2026-01-05'); -- パーティション付きテーブルからの全データのクエリ SELECT * FROM iceberg_catalog.db.part_tbl; -- dt='2026-01-01' のデータのクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01'; -- category = 'A' のデータのクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 複数条件(dt + category)によるクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE dt='2026-01-01' AND category = 'A'; -- category ごとの集計件数 SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- データベースの削除(注意が必要)。削除前にデータベースが空であることを確認してください。 -- DROP DATABASE iceberg_catalog.db;作成した SQL セッションをセッションのドロップダウンリストから選択し、[実行] をクリックします。正常に実行された後、下部に結果が表示されます。

参考資料
SQL タスク開発およびオーケストレーションプロセスの完全な例については、「SparkSQL 開発のクイックスタート」をご参照ください。
Iceberg の詳細については、「Apache Iceberg」をご参照ください。
SQL セッションの詳細については、「SQL セッションの管理」をご参照ください。
ノートブックセッションの詳細については、「ノートブックセッションの管理」をご参照ください。