Iceberg connector memungkinkan Anda melakukan kueri dan menulis data ke tabel Apache Iceberg menggunakan Trino pada EMR. Iceberg adalah format tabel terbuka untuk data lake yang mendukung transaksi ACID, evolusi partisi, dan perjalanan waktu berbasis snapshot.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Mengaktifkan layanan Presto pada kluster DataLake atau kluster Hadoop. Lihat Buat kluster.
Batasan
Hanya kluster DataLake dan kluster Hadoop yang menjalankan EMR V3.38.0 atau versi lebih baru yang mendukung Iceberg connector.
Jika Anda memilih DLF Unified Metadata saat membuat kluster, Anda tidak dapat menulis data ke tabel Iceberg.
Konfigurasi Iceberg connector
Untuk prosedur umum mengubah konfigurasi connector, lihat Konfigurasi connector.
Konfigurasi default
Login ke Konsol EMR, buka tab Configure pada halaman layanan Trino, lalu klik iceberg.properties. Tab tersebut menampilkan item konfigurasi hive.metastore.uri, yang menentukan URI Hive Metastore yang dapat diakses melalui protokol Thrift. Ubah nilai ini agar sesuai dengan lingkungan Anda.
Tambahkan item konfigurasi
Pada tab Configure halaman layanan Trino, klik iceberg.properties, lalu klik Add Configuration Item.
| Item konfigurasi | Deskripsi | Default |
|---|---|---|
iceberg.file-format | Format file untuk menyimpan data tabel Iceberg. Nilai yang valid: ORC, PARQUET. | ORC |
iceberg.compression-codec | Kodek kompresi yang digunakan saat menulis file. Nilai yang valid: GZIP, ZSTD, LZ4, SNAPPY, NONE. | GZIP |
iceberg.max-partitions-per-writer | Jumlah maksimum partisi yang dapat diproses oleh setiap writer. | 100 |
Kueri tabel Iceberg
Langkah-langkah berikut menunjukkan cara membuat skema dan tabel, memasukkan data, serta melakukan kueri hasil menggunakan SQL Trino standar.
Prasyarat
Login ke kluster Anda dalam mode SSH. Lihat Login ke kluster.
Hubungkan ke client Trino. Lihat Gunakan CLI untuk terhubung ke Trino.
Langkah-langkah
Buat skema:
CREATE SCHEMA iceberg.testdb;Buat tabel:
CREATE TABLE iceberg.testdb.iceberg_test (id INT);Masukkan data:
INSERT INTO iceberg.testdb.iceberg_test VALUES (1), (2);Lakukan kueri pada tabel:
SELECT * FROM iceberg.testdb.iceberg_test;Output yang diharapkan:
id ---- 1 2
Sintaksis SQL
Iceberg connector mendukung pembacaan dan penulisan data serta metadata di tabel Iceberg. Selain SQL standar, connector ini juga mendukung pernyataan berikut:
| Pernyataan | Referensi |
|---|---|
| INSERT | INSERT dalam dokumentasi Trino |
| DELETE | Hapus data berdasarkan partisi dalam topik ini dan DELETE dalam dokumentasi Trino |
| Manajemen skema dan tabel | Partisi tabel dalam topik ini dan Manajemen skema dan tabel dalam dokumentasi Trino |
| Manajemen materialized view | Kelola materialized view dalam topik ini dan Manajemen materialized view dalam dokumentasi Trino |
| Manajemen Tampilan | Manajemen view dalam dokumentasi Trino |
Properti tabel
Gunakan klausa WITH untuk mengatur properti berikut saat membuat tabel Iceberg:
| Properti | Deskripsi | Default |
|---|---|---|
format | Format file untuk menyimpan data tabel. Nilai yang valid: ORC, PARQUET. | ORC |
partitioning | Kolom kunci partisi dalam bentuk array. Contoh: ARRAY['c1', 'c2']. | — |
location | URI sistem file tempat tabel disimpan. | — |
Contoh:
CREATE TABLE test_table (
c1 INTEGER,
c2 DATE,
c3 DOUBLE)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table');Partisi tabel
Iceberg connector mendukung partisi berbasis fungsi. Gunakan fungsi berikut dalam properti partitioning:
| Fungsi | Deskripsi |
|---|---|
year(ts) | Partisi berdasarkan tahun. Mengembalikan jumlah tahun antara ts dan 1 Januari 1970. |
month(ts) | Partisi berdasarkan bulan. Mengembalikan jumlah bulan antara ts dan 1 Januari 1970. |
day(ts) | Partisi berdasarkan hari. Mengembalikan jumlah hari antara ts dan 1 Januari 1970. |
hour(ts) | Partisi berdasarkan jam. Mengembalikan timestamp yang dipotong dengan bagian menit dan detik dihapus. |
bucket(x, nbuckets) | Partisi hash data ke jumlah bucket yang ditentukan. Mengembalikan nilai hash dari x dalam rentang [0, nbuckets - 1). |
truncate(s, nchars) | Mengembalikan nchars karakter pertama dari s. |
Contoh: Partisi tabel customer_orders berdasarkan bulan pesanan, hash nomor akun (10 bucket), dan negara:
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);Hapus data berdasarkan partisi
Untuk tabel partisi, jika Anda menyertakan klausa WHERE dalam pernyataan DELETE untuk memfilter partisi, Iceberg connector akan menghapus partisi yang sesuai dengan kondisi filter tersebut. Misalnya, pernyataan berikut menghapus semua partisi di mana country = 'US' dari tabel customer_orders:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US';Anda hanya dapat menggunakan Iceberg connector untuk menghapus data berdasarkan partisi. Pernyataan berikut gagal dieksekusi karena klausa WHERE memfilter baris tertentu dalam partisi, bukan seluruh partisi:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods';Kueri tabel sistem
Iceberg connector mengekspos tabel sistem yang menyediakan metadata tentang setiap tabel Iceberg.
Kueri partisi — mencakup nilai minimum dan maksimum untuk setiap kolom kunci partisi:
SELECT * FROM iceberg.testdb."customer_orders$partitions";Snapshot Query — mencantumkan semua Snapshot beserta stempel waktu komitnya:
SELECT * FROM iceberg.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC;Rollback ke snapshot
Tabel Iceberg mendukung snapshot. Iceberg connector menyediakan tabel snapshot sistem untuk setiap tabel Iceberg. ID snapshot bertipe data BIGINT.
Untuk mengembalikan tabel ke kondisi sebelumnya, pertama-tama dapatkan ID snapshot target, lalu panggil prosedur rollback.
Dapatkan ID snapshot terbaru:
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1;Lakukan rollback ke snapshot tersebut:
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****);Ganti
895459706749342****dengan ID snapshot aktual (nilai BIGINT).
Kelola materialized view
Materialized view dalam Iceberg connector terdiri atas definisi view dan tabel Iceberg pendukung. Nama tabel disimpan sebagai properti materialized view; data disimpan dalam tabel Iceberg.
| Pernyataan | Deskripsi |
|---|---|
| CREATE MATERIALIZED VIEW | Membuat materialized view. Gunakan klausa WITH untuk mengatur properti tabel Iceberg seperti format dan partitioning. Contoh: WITH (format = 'ORC', partitioning = ARRAY['event_date']). |
| REFRESH MATERIALIZED VIEW | Memperbarui materialized view dengan menghapus data tabel pendukung dan memasukkan kembali hasil kueri. Anda juga dapat menggunakan pernyataan ini untuk menghapus definisi dan tabel Iceberg dari materialized view. |
Ada jendela waktu kecil antara operasi penghapusan dan penyisipan selama refresh. Jika penyisipan gagal, materialized view akan kosong hingga refresh berhasil dilakukan berikutnya.
Langkah berikutnya
Ikhtisar — pelajari konsep Iceberg termasuk snapshot, partisi, dan versi format tabel.
Konfigurasi connector — panduan umum untuk mengubah konfigurasi connector di EMR.