Iceberg コネクタを使用すると、EMR 上の Trino を通じて Apache Iceberg テーブルのデータをクエリおよび書き込みできます。Iceberg は、ACID トランザクション、パーティションの進化、スナップショットに基づくタイムトラベルをサポートする、データレイク向けのオープンなテーブルフォーマットです。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Presto サービスが有効化された DataLake クラスターまたは Hadoop クラスターが存在すること。詳細については、「クラスターの作成」をご参照ください。
制限事項
Iceberg コネクタは、EMR V3.38.0 以降を実行する DataLake クラスターおよび Hadoop クラスターのみでサポートされます。
クラスター作成時に DLF 統合メタデータ を選択した場合、Iceberg テーブルへのデータ書き込みはできません。
Iceberg コネクタの構成
コネクタ構成の変更手順の概要については、「コネクタの構成」をご参照ください。
デフォルト構成
EMR コンソールにログインし、Trino サービスページの [構成] タブに移動して、[iceberg.properties] をクリックします。このタブには、Thrift プロトコル経由でアクセス可能な Hive メタストアの URI を指定する hive.metastore.uri 構成項目が表示されます。この値を、ご利用の環境に合わせて変更してください。
構成項目の追加
Trino サービスページの [構成] タブで、[iceberg.properties] をクリックし、その後 [構成項目の追加] をクリックします。
| 構成項目 | 説明 | デフォルト値 |
|---|---|---|
iceberg.file-format | Iceberg テーブルデータの保存に使用するファイルフォーマット。有効値: ORC、PARQUET。 | ORC |
iceberg.compression-codec | ファイル書き込み時に使用する圧縮コーデック。有効値: GZIP、ZSTD、LZ4、SNAPPY、NONE。 | GZIP |
iceberg.max-partitions-per-writer | 各ライターが処理できるパーティションの最大数。 | 100 |
Iceberg テーブルのクエリ
以下の手順では、標準的な Trino SQL を使用してスキーマおよびテーブルを作成し、データを挿入して結果をクエリする方法を示します。
前提条件
SSH モードでクラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
Trino クライアントに接続します。詳細については、「CLI を使用した Trino への接続」をご参照ください。
操作手順
スキーマを作成します:
CREATE SCHEMA iceberg.testdb;テーブルを作成します:
CREATE TABLE iceberg.testdb.iceberg_test (id INT);データを挿入します:
INSERT INTO iceberg.testdb.iceberg_test VALUES (1), (2);テーブルをクエリします:
SELECT * FROM iceberg.testdb.iceberg_test;期待される出力:
id ---- 1 2
SQL 構文
Iceberg コネクタは、Iceberg テーブルにおけるデータおよびメタデータの読み取りと書き込みをサポートします。標準 SQL に加えて、以下の文もサポートします。
| ステートメント | 参照 |
|---|---|
| INSERT | Trino ドキュメントの INSERT |
| DELETE | パーティションごとのデータ削除 および Trino ドキュメントの DELETE |
| スキーマとテーブル管理 | テーブルのパーティション化 および Trino ドキュメントのスキーマとテーブル管理 |
| マテリアライズドビュー管理 | マテリアライズドビューの管理 および Trino ドキュメントのマテリアライズドビュー管理 |
| ビュー管理 | Trino ドキュメントのビュー管理 |
テーブルプロパティ
Iceberg テーブルを作成する際に、WITH 句を使用して以下のプロパティを設定します。
| プロパティ | 説明 | デフォルト値 |
|---|---|---|
format | テーブルデータの保存に使用するファイルフォーマット。有効値: ORC、PARQUET。 | ORC |
partitioning | パーティションキー列を配列として指定します。例: ARRAY['c1', 'c2']。 | — |
location | テーブルを格納するファイルシステムの URI。 | — |
例:
CREATE TABLE test_table (
c1 INTEGER,
c2 DATE,
c3 DOUBLE)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table');テーブルのパーティション化
Iceberg コネクタは、関数ベースのパーティション化をサポートします。partitioning プロパティで以下の関数を使用できます。
| 関数 | 説明 |
|---|---|
year(ts) | 年単位でパーティション化します。ts と 1970 年 1 月 1 日との間の年数を返します。 |
month(ts) | 月単位でパーティション化します。ts と 1970 年 1 月 1 日との間の月数を返します。 |
day(ts) | 日単位でパーティション化します。ts と 1970 年 1 月 1 日との間の日数を返します。 |
hour(ts) | 時間単位でパーティション化します。分および秒の部分を除去した、切り捨てられたタイムスタンプを返します。 |
bucket(x, nbuckets) | 指定されたバケット数でハッシュパーティション化します。x のハッシュ値を [0, nbuckets - 1) の範囲で返します。 |
truncate(s, nchars) | nchars 文字分の s の先頭部分を返します。 |
例: customer_orders テーブルを注文月、アカウント番号のハッシュ(10 バケット)、国でパーティション化します。
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);パーティション単位での削除
パーティションテーブルの場合、WHERE 句を DELETE 文に含めてパーティションをフィルターすると、Iceberg コネクタは該当するフィルター条件に一致するパーティションを削除します。たとえば、次の文は customer_orders テーブルから country = 'US' であるすべてのパーティションを削除します。
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US';Iceberg コネクタでは、パーティション単位でのみデータを削除できます。次の文は実行に失敗します。これは、WHERE 句がパーティション内の特定の行をフィルターしており、完全なパーティションを対象としていないためです。
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods';システムテーブルのクエリ
Iceberg コネクタは、各 Iceberg テーブルに関するメタデータを提供するシステムテーブルを公開しています。
パーティションのクエリ — 各パーティションキー列の最小値および最大値を含む:
SELECT * FROM iceberg.testdb."customer_orders$partitions";スナップショットのクエリ — コミットタイムスタンプとともにすべてのスナップショットを一覧表示:
SELECT * FROM iceberg.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC;スナップショットへのロールバック
Iceberg テーブルはスナップショットをサポートします。Iceberg コネクタは、各 Iceberg テーブルに対してシステムスナップショットテーブルを提供します。スナップショット ID は BIGINT データ型です。
テーブルを以前の状態に戻すには、まず対象のスナップショット ID を取得し、その後ロールバックプロシージャを呼び出します。
最新のスナップショット ID を取得します:
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1;スナップショットへロールバックします:
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****);895459706749342****を実際のスナップショット ID(BIGINT 値)に置き換えてください。
マテリアライズドビューの管理
Iceberg コネクタにおけるマテリアライズドビューは、ビュー定義とそのバックアップとなる Iceberg テーブルで構成されます。テーブル名はマテリアライズドビューのプロパティとして格納され、データは Iceberg テーブルに格納されます。
| 文 | 説明 |
|---|---|
| CREATE MATERIALIZED VIEW | マテリアライズドビューを作成します。WITH 句を使用して、format や partitioning などの Iceberg テーブルプロパティを設定します。例:WITH (format = 'ORC', partitioning = ARRAY['event_date'])。 |
| REFRESH MATERIALIZED VIEW | マテリアライズドビューを更新します。これにより、バックアップテーブルのデータが削除され、クエリ結果が再挿入されます。また、この文を使用して、マテリアライズドビューの定義および Iceberg テーブルを削除することもできます。 |
リフレッシュ中の削除および挿入操作の間には、短いタイムウィンドウが存在します。挿入操作が失敗した場合、次回の成功したリフレッシュまでマテリアライズドビューは空の状態になります。