全部产品
Search
文档中心

E-MapReduce:Penggunaan Dasar

更新时间:Jul 06, 2025

Topik ini menjelaskan informasi konfigurasi Delta Lake di E-MapReduce (EMR) serta menyediakan beberapa contoh perintah umum.

Informasi Konfigurasi

Konfigurasi default Delta Lake di EMR:

  • Spark 2.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

Contoh Perintah Umum

  • Membuat tabel

    CREATE TABLE delta_table (id INT) USING delta;
  • Menyisipkan data ke dalam tabel

    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • Menimpa data yang ada di tabel

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • Mengkueri data

    SELECT * FROM delta_table;
  • Memperbarui data

    Tambahkan 100 ke semua ID yang merupakan angka genap.

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • Menghapus data

    Hapus catatan yang ID-nya adalah angka genap.

    DELETE FROM delta_table WHERE mod(id, 2) = 0;

Gabungkan data

  1. Buat tabel sumber untuk operasi penggabungan.

    CREATE TABLE newData(id INT) USING delta;
  2. Masukkan data ke dalam tabel.

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. Gabungkan data dari tabel newData ke tabel delta_table. Jika ID sebuah catatan di tabel newData sama dengan ID dari sebuah catatan di tabel delta_table, tambahkan 100 ke ID catatan di tabel sumber. Jika tidak, masukkan catatan tersebut secara langsung.

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. Baca data dari tabel Delta dalam mode streaming.

    1. Buat tabel tujuan untuk menulis data dalam mode streaming.

      CREATE TABLE stream_debug_table (id INT) USING delta;
    2. Buat aliran.

      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      Catatan

      Dalam contoh ini, delta_table adalah tabel Delta yang sudah ada.

    3. Tulis data ke tabel tujuan dalam mode streaming.

      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT * FROM stream_delta_table;

Contoh

  1. Masuk ke kluster Anda melalui SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.

  2. Jalankan perintah berikut untuk memulai streaming-sql:

    streaming-sql
    Catatan

    Jika layanan Delta Lake telah ditambahkan ke kluster Anda, Anda dapat langsung menjalankan perintah streaming-sql. Jika belum, jalankan perintah berikut untuk menggunakan Delta Lake:

    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. Jalankan perintah berikut untuk membuat tabel tujuan:

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. Jalankan perintah berikut untuk membuat aliran:

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. Jalankan perintah berikut untuk menulis data dari tabel sumber bernama delta_table ke tabel tujuan dalam mode streaming:

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;
  6. Mulai klien streaming-sql lain dan lakukan operasi berikut untuk menyisipkan data ke tabel sumber serta mengkueri data di tabel tujuan:

    1. Jalankan perintah berikut untuk mengkueri data yang ada di tabel tujuan:

      SELECT * FROM stream_debug_table;
    2. Jalankan perintah berikut untuk menyisipkan data ke tabel sumber:

      INSERT INTO delta_table VALUES 801, 802;
    3. Jalankan perintah berikut untuk memeriksa apakah data yang dimasukkan ke tabel sumber telah ditulis ke tabel tujuan dalam mode streaming:

      SELECT * FROM stream_debug_table;
    4. Jalankan perintah berikut untuk menyisipkan data ke tabel sumber:

      INSERT INTO delta_table VALUES 901, 902;
    5. Jalankan perintah berikut untuk memeriksa apakah data yang dimasukkan ke tabel sumber telah ditulis ke tabel tujuan dalam mode streaming:

      SELECT * FROM stream_debug_table;