Iceberg は、データレイク向けのオープンなテーブル形式です。 Iceberg を使用すると、Hadoop 分散ファイルシステム(HDFS)または Alibaba Cloud Object Storage Service(OSS)上に独自のデータレイクストレージサービスを迅速に構築できます。 このトピックでは、EMR Serverless Spark で Iceberg テーブルからデータを読み書きする方法について説明します。
前提条件
ワークスペースが作成されていること。 詳細については、「ワークスペースを作成する」をご参照ください。
手順
Iceberg テーブルとのデータの読み書きには、Spark SQL ジョブまたはノートブックのみを使用できます。 このトピックでは、Spark SQL ジョブを使用します。
ステップ 1:セッションを作成する
セッションページに移動します。
EMR コンソール にログインします。
左側のナビゲーションウィンドウで、 を選択します。
[Spark] ページで、管理するワークスペースの名前をクリックします。
[EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[オペレーションセンター] > [セッション] を選択します。
[SQL セッション] タブで、[SQL セッションの作成] をクリックします。
[SQL セッションの作成] ページで、使用するカタログに基づいて [Spark 構成] セクションに次のコードを追加し、[作成] をクリックします。 詳細については、「SQL セッションを管理する」をご参照ください。
EMR Serverless Spark で Iceberg からデータを読み書きする場合、カタログが必要です。 ビジネス要件に基づいてカタログを指定できます。
カタログタイプ
タイプ
説明
Iceberg カタログ
Iceberg 形式のメタデータを管理するために使用されるカタログ。 Iceberg カタログは、Iceberg テーブルのデータのクエリと書き込みにのみ使用できます。
Data Lake Formation(DLF)1.0 カタログ、Hive Metastore カタログ、およびファイルシステムカタログがサポートされています。 ビジネス要件に基づいてカタログを指定できます。
Iceberg テーブルにアクセスするには、
<カタログ名>.<データベース名>.<テーブル名>形式でテーブル名を指定する必要があります。重要<カタログ名>はカタログの名前を指定します。 ビジネス要件に基づいてカタログ名を指定できます。 デフォルトのカタログ名icebergを使用することをお勧めします。
spark_catalog
ワークスペースのデフォルトカタログ。Iceberg テーブルと非 Iceberg テーブルのデータのクエリに使用できます。
ワークスペースのデフォルトカタログが使用されます。
外部 Hive Metastore をカタログとして使用する方法については、「EMR Serverless Spark を使用して外部 Hive Metastore に接続する」をご参照ください。
Iceberg テーブルまたは非 Iceberg テーブルにアクセスするには、
<データベース名>.<テーブル名>形式でテーブル名を指定する必要があります。
カタログ構成
Iceberg カタログを使用する
DLF 1.0
メタデータは DLF 1.0 に保存されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.aliyun.dlf.hive.DlfCatalogHive Metastore
メタデータは特定の Hive Metastore に保存されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.hive.HiveCatalog spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>パラメーター
説明
thrift://<yourHMSUri>:<port>Hive Metastore の Uniform Resource Identifier(URI)。 このパラメーターは、
thrift://<Hive Metastore の IP アドレス>:9083形式で構成します。<Hive Metastore の IP アドレス>は、Hive Metastore の内部 IP アドレスを指定します。 外部 Hive Metastore を指定する方法については、「外部 Hive Metastore に接続する」をご参照ください。ファイルシステム
メタデータはファイルシステムに保存されます。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.type hadoop spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouseSpark カタログを使用する
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
作成されたセッションを見つけ、[アクション] 列の [開始] をクリックします。
ステップ 2:Iceberg テーブルからデータを読み書きする
EMR Serverless Spark のデータ開発ページに移動します。
[EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。
[開発] タブで、
アイコンをクリックします。[作成] ダイアログボックスで、[名前] パラメーターを users_task に設定し、[タイプ] パラメーターにデフォルト値 SparkSQL を使用して、[OK] をクリックします。
users_task タブで、次のコードをコードエディターにコピーします。
CREATE DATABASE IF NOT EXISTS iceberg.ss_iceberg_db; CREATE TABLE iceberg.ss_iceberg_db.iceberg_tbl (id INT, name STRING) USING iceberg; INSERT INTO iceberg.ss_iceberg_db.iceberg_tbl VALUES (1, "a"), (2, "b"); SELECT id, name FROM iceberg.ss_iceberg_db.iceberg_tbl ORDER BY id;テーブルとデータベースが不要になった場合は、次のコマンドを実行して削除できます。
DROP TABLE iceberg.ss_iceberg_db.iceberg_tbl; DROP DATABASE iceberg.ss_iceberg_db;[デフォルトデータベース] ドロップダウンリストからデータベースを選択し、[SQL セッション] ドロップダウンリストから作成した SQL セッションを選択します。
[実行] をクリックします。 次の図は出力を示しています。

関連情報
SQL ジョブの開発とオーケストレーションの方法については、「Spark SQL ジョブ開発の開始」をご参照ください。
Iceberg の詳細については、「Apache Iceberg」を参照してください。
SQL セッションの詳細については、「SQL セッションを管理する」をご参照ください。
ノートブックセッションの詳細については、「ノートブックセッションを管理する」をご参照ください。