このトピックでは、Iceberg REST を使用して EMR Serverless Spark から Data Lake Formation (DLF) カタログにアクセスする方法について説明します。
前提条件
DLF インスタンスと同じリージョンに Serverless Spark ワークスペースを作成済みであること。詳細については、「ワークスペースの作成」をご参照ください。
Resource Access Management (RAM) ユーザーの場合、データ操作を実行する前に、必要なリソース権限を付与する必要があります。詳細については、「データ権限付与の管理」をご参照ください。
制限事項
サポートされているタスクタイプは次のとおりです:
SQL セッション:SQL セッションの管理。
Spark Thrift Server:Spark Thrift Server セッションの管理。
バッチジョブ:バッチジョブの開発。
ステップ 1:カタログ権限の付与
[Catalogs] ページで、カタログの名前をクリックして詳細ページに移動します。
カタログ全体に権限を付与するには、[Permissions] タブをクリックします。または、特定のデータベースまたはテーブルに移動し、その [Permissions] タブをクリックしてアクセス権を付与することもできます。
権限付与ページで、次の設定を行い、[OK] をクリックします。
[User/Role]:[RAM User/RAM Role] を選択します。
[認証オブジェクトの選択]: ドロップダウンリストから AliyunECSInstanceForEMRRole を選択します。
説明ドロップダウンリストに [AliyunECSInstanceForEMRRole] が表示されない場合は、ユーザー管理ページに移動して [Sync] をクリックしてください。
プリセット権限タイプ: 読み取り権限を手動で選択するか、 Data Reader や Data Editor などの事前定義されたロールを使用します。
ステップ 2:データの読み書き
カタログに接続します。
SQL セッションを作成できます。詳細については、「SQL セッションの管理」をご参照ください。エンジンバージョン esr-4.7.0、esr-3.6.0、またはそれ以降を使用できます。
データカタログの使用
データカタログを使用する場合、セッションでパラメーターを設定する必要はありません。[データカタログ] ページに移動し、[データカタログの追加] をクリックします。その後、SparkSQL 開発でデータカタログを直接選択できます。
カスタムカタログの使用
[カスタム設定] の [Spark 設定] セクションで、次の設定を追加できます。
重要以下の設定例では、
iceberg_catalogはカスタムカタログ名です。これは、Iceberg REST カタログに基づいて、Spark に Iceberg テーブル管理サービスを登録します。このカタログは、REST API を使用して Alibaba Cloud DLF に接続します。必要に応じて、カタログ名と関連パラメーターを変更できます。${regionID}:これを実際のリージョン ID (例:cn-hangzhou) に置き換えることができます。詳細については、「エンドポイント」をご参照ください。${catalogName}:これを実際の DLF カタログ名に置き換えることができます。${access_key_id}と${access_key_secret}:これらを Alibaba Cloud アカウントの AccessKey ID とシークレットに置き換えることができます。
# Iceberg Spark 拡張機能を有効化 spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions # iceberg_catalog カタログを登録 spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog # 基盤となるカタログ実装は Iceberg REST カタログ spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog # DLF Iceberg サービスの REST API エンドポイント spark.sql.catalog.iceberg_catalog.uri http://${regionID}-vpc.dlf.aliyuncs.com/iceberg # DLF カタログ名を指定 spark.sql.catalog.iceberg_catalog.warehouse ${catalogName} # カスタム DLF FileIO 実装を使用 spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO # SigV4 署名認証を有効化 spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region ${regionID} spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext # アクセス認証情報 spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id} spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}データの読み書きを行います。
SQL ジョブ開発の完全な例については、「SparkSQL 開発のクイックスタート」をご参照ください。
説明データベースが指定されていない場合、テーブルはカタログの
defaultデータベースに作成されます。別のデータベースを作成して指定することもできます。-- データベースの作成 CREATE DATABASE IF NOT EXISTS db; -- 非パーティションテーブルの作成 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- 非パーティションテーブルへのデータ挿入 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- フルテーブルクエリ SELECT * FROM iceberg_catalog.db.tbl; -- 条件付きクエリ SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- データの更新 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- テーブルを再度クエリして更新を確認 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- データの削除 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- テーブルを再度クエリして削除を確認 SELECT * FROM iceberg_catalog.db.tbl; -- パーティションテーブルの作成 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP ) USING iceberg PARTITIONED BY (category); -- データの挿入 INSERT INTO iceberg_catalog.db.part_tbl VALUES (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')), (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')), (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')), (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00')); -- フルテーブルクエリ SELECT * FROM iceberg_catalog.db.part_tbl; -- 条件付きクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0; -- 条件付きクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01'; -- 条件付きクエリ SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 複合条件クエリ (バケット + 日付 + カテゴリ) SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0 AND days(ts) = '2025-01-01' AND category = 'A'; -- 各カテゴリのデータエントリ数を集計してカウント SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- データベースの削除。内部のすべてのテーブルが空であることを確認してください。 -- DROP DATABASE iceberg_catalog.db;