Realtime Compute for Apache Flink memungkinkan Anda menggunakan dialek Hive untuk membuat pekerjaan batch, sehingga meningkatkan interoperabilitas dengan sintaks Hive SQL dan mempermudah migrasi pekerjaan Hive yang sudah ada ke Konsol Realtime Compute for Apache Flink.
Prasyarat
-
Jika Anda menggunakan pengguna Resource Access Management (RAM) atau Peran RAM untuk mengakses konsol, pastikan Anda memiliki izin yang diperlukan untuk konsol Flink. Untuk informasi selengkapnya, lihat Manajemen izin.
-
Anda telah membuat ruang kerja Flink. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
Batasan
-
Hanya Ververica Runtime (VVR) versi 8.0.11 atau lebih baru yang mendukung dialek Hive.
-
Saat ini, hanya sintaks pernyataan INSERT dari dialek Hive yang didukung. Anda harus mendeklarasikan
USE Catalog <yourHiveCatalog>sebelum pernyataan INSERT. -
Fungsi user-defined (UDF) Hive dan Flink tidak didukung.
Langkah 1: Buat Hive Catalog
-
Konfigurasikan metadata Hive. Untuk informasi selengkapnya, lihat Konfigurasi metadata Hive.
-
Buat Hive Catalog. Untuk informasi selengkapnya, lihat Buat Hive Catalog.
Dalam contoh ini, Hive Catalog diberi nama hdfshive.
Langkah 2: Siapkan tabel data Hive contoh
-
Pada halaman , klik
New untuk membuat skrip kueri. -
Jalankan pernyataan SQL contoh berikut.
PentingTabel sumber dan tabel sink Hive harus berupa tabel permanen yang dibuat menggunakan
CREATE TABLE, bukan tabel temporary yang dibuat menggunakanCREATE TEMPORARY TABLE.-- Gunakan Hive Catalog. hdfshive adalah nama katalog yang dibuat pada Langkah 1. USE CATALOG hdfshive; -- Tabel sumber CREATE TABLE source_table ( id INT, name STRING, age INT, city STRING, salary FLOAT )WITH ('connector' = 'hive'); -- Tabel sink CREATE TABLE target_table ( city STRING, avg_salary FLOAT, user_count INT )WITH ('connector' = 'hive'); -- Tulis data uji INSERT INTO source_table VALUES (1, 'Alice', 25, 'New York', 5000.0), (2, 'Bob', 30, 'San Francisco', 6000.0), (3, 'Charlie', 35, 'New York', 7000.0), (4, 'David', 40, 'San Francisco', 8000.0), (5, 'Eva', 45, 'Los Angeles', 9000.0);
Langkah 3: Buat pekerjaan Hive SQL
-
Di panel navigasi sebelah kiri, pilih .
-
Klik New. Di kotak dialog Create Job Draft, pilih Blank Batch Job Draft (BETA) lalu klik Next.
-
Masukkan informasi pekerjaan.
Job parameters
Description
Example
File Name
Nama pekerjaan.
CatatanNama pekerjaan harus unik dalam satu Proyek.
hive-sql
Storage Location
Folder tempat file kode pekerjaan disimpan.
Anda juga dapat mengklik ikon
di samping folder yang sudah ada untuk membuat subfolder.Job Drafts
Engine Version
Versi engine Flink untuk pekerjaan ini.
Gunakan versi yang memiliki label Recommended. Versi tersebut menawarkan keandalan lebih tinggi dan performa lebih baik. Untuk informasi selengkapnya mengenai versi engine, lihat Feature release notes dan Engine versions.
vvr-8.0.11-flink-1.17
SQL Dialect
Bahasa pemrosesan data SQL.
CatatanItem konfigurasi ini hanya ditampilkan untuk versi engine yang mendukung dialek Hive.
Hive SQL
-
Klik Create.
Langkah 4: Tulis dan sebarkan pekerjaan Hive SQL
-
Tulis pernyataan SQL.
Contoh ini menghitung jumlah pengguna berusia di atas 30 tahun dan gaji rata-rata mereka di setiap kota. Salin pernyataan SQL contoh berikut ke editor SQL.
-- Gunakan Hive Catalog. hdfshive adalah nama katalog yang dibuat pada Langkah 1. USE CATALOG hdfshive; INSERT INTO TABLE target_table SELECT city, AVG(salary) AS avg_salary, -- Hitung gaji rata-rata COUNT(id) AS user_count -- Hitung jumlah pengguna FROM source_table WHERE age > 30 -- Filter pengguna berusia di atas 30 GROUP BY city; -- Kelompokkan berdasarkan kota -
Di pojok kanan atas, klik Deploy. Di kotak dialog yang muncul, konfigurasikan parameter sesuai kebutuhan lalu klik OK. Contoh ini menggunakan pengaturan default.
(Opsional) Langkah 5: Konfigurasi parameter pekerjaan yang sedang berjalan
Lakukan langkah ini jika Anda mengakses kluster Hive menggunakan JindoSDK.
-
Di panel navigasi sebelah kiri, pilih .
-
Dari daftar drop-down, pilih Batch Job lalu klik Details untuk pekerjaan target.

-
Di kotak dialog Deployment Details, klik Edit di sebelah kanan area Parameter Settings.
-
Di Other Configurations, tambahkan konfigurasi berikut.
fs.oss.jindo.endpoint: <YOUR_Endpoint> fs.oss.jindo.buckets: <YOUR_Buckets> fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId> fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>Untuk informasi selengkapnya tentang parameter tersebut, lihat Tulis data ke OSS-HDFS.
-
Klik Save.
Langkah 6: Jalankan pekerjaan dan lihat hasilnya
-
Klik Start untuk pekerjaan target.

-
Setelah status pekerjaan berubah menjadi Finished, Anda dapat melihat hasilnya.
Di halaman , jalankan contoh SQL berikut untuk melihat jumlah pengguna berusia di atas 30 tahun dan gaji rata-rata mereka di setiap kota.
-- Gunakan Hive Catalog. hdfshive adalah nama katalog yang dibuat pada Langkah 1. USE CATALOG hdfshive; select * from target_table;
Pengembangan pekerjaan Hive Jar
VVR mendukung menjalankan pekerjaan dialek Hive sebagai pekerjaan JAR. Untuk menjalankan pekerjaan JAR, Anda harus menggunakan paket JAR "ververica-connector-hive-2.3.6" versi 11.2 atau lebih baru. Konfigurasi Hive dalam pekerjaan JAR dan VVR harus konsisten.
-
Konfigurasi VVP
-
Unggah paket JAR untuk pekerjaan di kolom JAR Uri.
-
Di bagian Additional Dependencies, unggah empat file konfigurasi dari kluster Hive: core-site.xml, mapred-site.xml, hdfs-site.xml, dan hive-site.xml. Anda juga harus mengunggah paket JAR "ververica-connector-hive-2.3.6".
-
Konfigurasikan parameter running berdasarkan konfigurasi kluster Hive Anda. Untuk menulis data ke OSS-HDFS, lihat konfigurasi di (Opsional) Langkah 5: Konfigurasi parameter running pekerjaan.
table.sql-dialect: HIVE classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime kubernetes.application-mode.classpath.include-user-jar: true
-
-
Contoh kode pekerjaan JAR
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration conf = new Configuration(); conf.setString("type", "hive"); conf.setString("default-database", "default"); conf.setString("hive-version", "2.3.6"); conf.setString("hive-conf-dir", "/flink/usrlib/" ); conf.setString("hadoop-conf-dir", "/flink/usrlib/"); CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf); tableEnv.createCatalog("hivecat", descriptor); tableEnv.loadModule("hive", new HiveModule()); tableEnv.useModules("hive"); tableEnv.useCatalog("hivecat"); tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");
Referensi
-
Untuk informasi selengkapnya tentang sintaks INSERT dialek Hive, lihat Pernyataan INSERT | Apache Flink.
-
Untuk informasi selengkapnya tentang pemrosesan data batch menggunakan Flink SQL, lihat Memulai cepat untuk pekerjaan batch Flink.