Topik ini menjelaskan informasi konfigurasi Delta Lake di E-MapReduce (EMR) serta menyediakan beberapa contoh perintah umum.
Informasi Konfigurasi
Konfigurasi default Delta Lake di EMR:
Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtensionSpark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Contoh Perintah Umum
Membuat tabel
CREATE TABLE delta_table (id INT) USING delta;Menyisipkan data ke dalam tabel
INSERT INTO delta_table VALUES 0,1,2,3,4;Menimpa data yang ada di tabel
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;Mengkueri data
SELECT * FROM delta_table;Memperbarui data
Tambahkan 100 ke semua
IDyang merupakan angka genap.UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;Menghapus data
Hapus catatan yang
ID-nya adalah angka genap.DELETE FROM delta_table WHERE mod(id, 2) = 0;
Gabungkan data
Buat tabel sumber untuk operasi penggabungan.
CREATE TABLE newData(id INT) USING delta;Masukkan data ke dalam tabel.
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;Gabungkan data dari tabel newData ke tabel delta_table. Jika
IDsebuah catatan di tabel newData sama dengan ID dari sebuah catatan di tabel delta_table, tambahkan 100 keIDcatatan di tabel sumber. Jika tidak, masukkan catatan tersebut secara langsung.MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;Baca data dari tabel Delta dalam mode streaming.
Buat tabel tujuan untuk menulis data dalam mode streaming.
CREATE TABLE stream_debug_table (id INT) USING delta;Buat aliran.
CREATE SCAN stream_delta_table on delta_table USING STREAM;CatatanDalam contoh ini, delta_table adalah tabel Delta yang sudah ada.
Tulis data ke tabel tujuan dalam mode streaming.
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
Contoh
Masuk ke kluster Anda melalui SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.
Jalankan perintah berikut untuk memulai streaming-sql:
streaming-sqlCatatanJika layanan Delta Lake telah ditambahkan ke kluster Anda, Anda dapat langsung menjalankan perintah
streaming-sql. Jika belum, jalankan perintah berikut untuk menggunakan Delta Lake:streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtensionJalankan perintah berikut untuk membuat tabel tujuan:
CREATE TABLE stream_debug_table (id INT) USING DELTA;Jalankan perintah berikut untuk membuat aliran:
CREATE SCAN stream_delta_table on delta_table USING STREAM;Jalankan perintah berikut untuk menulis data dari tabel sumber bernama delta_table ke tabel tujuan dalam mode streaming:
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;Mulai klien streaming-sql lain dan lakukan operasi berikut untuk menyisipkan data ke tabel sumber serta mengkueri data di tabel tujuan:
Jalankan perintah berikut untuk mengkueri data yang ada di tabel tujuan:
SELECT * FROM stream_debug_table;Jalankan perintah berikut untuk menyisipkan data ke tabel sumber:
INSERT INTO delta_table VALUES 801, 802;Jalankan perintah berikut untuk memeriksa apakah data yang dimasukkan ke tabel sumber telah ditulis ke tabel tujuan dalam mode streaming:
SELECT * FROM stream_debug_table;Jalankan perintah berikut untuk menyisipkan data ke tabel sumber:
INSERT INTO delta_table VALUES 901, 902;Jalankan perintah berikut untuk memeriksa apakah data yang dimasukkan ke tabel sumber telah ditulis ke tabel tujuan dalam mode streaming:
SELECT * FROM stream_debug_table;