All Products
Search
Document Center

E-MapReduce:Gunakan cluster Dataflow untuk membaca data dari dan menulis data ke tabel Hudi berdasarkan DLF

Last Updated:Jul 02, 2025

Anda dapat menggunakan cluster Dataflow E-MapReduce (EMR) untuk mengakses tabel Hudi di cluster DataLake atau cluster kustom melalui layanan metadata terpadu Data Lake Formation (DLF). Topik ini menjelaskan cara menghubungkan cluster EMR Dataflow ke DLF dan membaca data penuh dari tabel Hudi.

Prasyarat

  • Cluster Dataflow dan cluster DataLake dibuat di Konsol EMR dan termasuk dalam virtual private cloud (VPC) yang sama. Untuk informasi lebih lanjut, lihat Buat sebuah kluster.

    Penting

    Pilih DLF Unified Metadata untuk parameter Metadata saat membuat cluster DataLake.

  • DLF telah diaktifkan. Untuk informasi lebih lanjut, lihat Memulai.

Batasan

Versi cluster Dataflow harus EMR V3.38.3 atau lebih baru, tetapi tidak boleh melebihi EMR V3.50.x atau EMR V5.16.x.

Prosedur

  1. Langkah 1: Persiapkan

  2. Langkah 2: Mulai Flink SQL

  3. Langkah 3: Buat katalog

  4. Langkah 4: Tulis data ke tabel Hudi menggunakan Flink SQL

  5. Langkah 5: Kueri data dari tabel Hudi di cluster DataLake

Langkah 1: Persiapkan

Salin file konfigurasi hive-site.xml dari direktori yang ditentukan oleh parameter ${HIVE_CONF_DIR} pada cluster DataLake ke cluster Dataflow.

Sebagai contoh, direktori yang ditentukan oleh parameter ${HIVE_CONF_DIR} adalah /etc/taihao-apps/hive-conf/.

mkdir /etc/taihao-apps/hive-conf
scp root@<Alamat IP internal node master-1-1>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

Langkah 2: Mulai Flink SQL

Penting
  • Pastikan dependensi DLF ditempatkan sebelum dependensi Hive. Dependensi Hudi disertakan dalam dependensi DLF.

  • Tidak perlu mencatat versi Hive yang diinstal di cluster DataLake. Semua dependensi Hive menggunakan versi 2.3.6.

  1. Jalankan perintah berikut untuk memulai sesi Flink YARN:

    yarn-session.sh -d -qu default
  2. Jalankan perintah berikut untuk memulai Flink SQL:

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    Catatan

    Ganti nomor versi paket JAR di atas sesuai dengan situasi aktual.

  3. Buat konfigurasi berikut selama pengujian:

    -- Aktifkan keluaran log rinci.
    set sql-client.verbose=true;
    -- Atur mode tampilan hasil ke format tabel.
    set sql-client.execution.result-mode=tableau;
    -- Atur interval checkpoint menjadi 1 detik untuk memastikan bahwa data hanya terlihat setelah checkpoint dipicu. Konfigurasi ini terutama digunakan untuk menghasilkan data sumber di Langkah 4.
    set execution.checkpointing.interval=1000;

Langkah 3: Buat katalog

Setelah memulai Flink SQL, jalankan perintah berikut untuk membuat katalog DLF guna membaca data dari tabel Hudi:

CREATE CATALOG dlf_catalog WITH (
     'type' = 'dlf',
     'access.key.id' = '<yourAccessKeyId>', -- ID AccessKey akun Alibaba Cloud Anda.
     'access.key.secret' = '<yourAccessKeySecret>', -- Rahasia AccessKey akun Alibaba Cloud Anda.
     'warehouse' = 'oss://<bucket>/<object>', -- bucket: nama bucket Object Storage Service (OSS) Anda. object: jalur tempat data Anda disimpan. Anda dapat melihat informasi ini di konsol OSS.
     'oss.endpoint' = '<oss.endpoint>', -- Dapatkan nilai fs.oss.endpoint dari ${HADOOP_CONF_DIR}/core-site.xml.
     'dlf.endpoint' = '<dlf.endpoint>', -- Dapatkan nilai dlf.catalog.endpoint dari /etc/taihao-apps/hive-conf/hive-site.xml.
     'dlf.region-id' = '<dlf.region-id>' -- Dapatkan nilai dlf.catalog.region dari /etc/taihao-apps/hive-conf/hive-site.xml.
 );

Setelah katalog dibuat, informasi berikut akan dikembalikan:

[INFO] Eksekusi pernyataan berhasil.

Langkah 4: Tulis data ke tabel Hudi menggunakan Flink SQL

Gunakan konektor Datagen untuk secara acak menghasilkan data sumber dan tulis data tersebut ke tabel Hudi.

-- Hasilkan data sumber.
CREATE TABLE datagen_source (
  uuid int,
  age int,
  ts bigint
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

-- Buat database Hudi dan tabel Hudi.
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
  id int NOT NULL,
  age int,
  ts bigint
)
WITH(
  'connector'='hudi',
  'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> adalah gudang yang ditentukan saat pembuatan katalog DLF, testdb adalah nama database yang dibuat, dan hudi_tbl1 adalah nama tabel yang dibuat.
  'table.type'='COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field'='id',
  'hive_sync.enable'='true',
  'hive_sync.table'='hudi_tbl1',    -- Diperlukan. Nama tabel Hive.
  'hive_sync.db'='testdb',            -- Diperlukan. Nama database Hive.
  'hive_sync.mode' = 'hms'          -- Diperlukan. Atur parameter hive_sync.mode ke hms. Nilai defaultnya adalah jdbc.
);

-- Tulis data ke danau data.
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- Kueri data.
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

Langkah 5: Kueri data dari tabel Hudi di cluster DataLake

Masuk ke cluster DataLake dan kueri data dari tabel Hudi. Untuk informasi lebih lanjut tentang cara masuk ke kluster, lihat Masuk ke kluster.

  • Gunakan Spark SQL

    Untuk informasi lebih lanjut, lihat Integrasikan Hudi dengan Spark SQL.

    1. Jalankan perintah berikut untuk memulai Spark SQL:

      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      Jika kluster Anda menggunakan Spark 3 dan versi Hudi adalah 0.11 atau lebih baru, tambahkan konfigurasi berikut:

      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. Jalankan perintah berikut untuk mengkueri data dari tabel:

      SELECT * FROM testdb.hudi_tbl1;
  • Gunakan Hive CLI

    1. Jalankan perintah berikut untuk memulai Hive CLI:

      hive
    2. Jalankan perintah berikut untuk mengkueri data dari tabel:

      SELECT * FROM testdb.hudi_tbl1;