E-MapReduce (EMR) memungkinkan Anda menggunakan klien Flink SQL untuk membaca data dari dan menulis data ke Paimon. Topik ini menjelaskan cara menggunakan klien Flink SQL untuk berinteraksi dengan Paimon.
Prasyarat
Pastikan telah membuat kluster Dataflow atau kluster kustom yang mencakup layanan Flink dan Paimon. Untuk detail lebih lanjut, lihat Buat kluster.
Jika Anda ingin menggunakan katalog Hive untuk berinteraksi dengan Paimon, buat kluster kustom yang mencakup layanan Flink, Paimon, dan Hive. Selain itu, pilih Self-managed RDS atau Built-in MySQL untuk parameter Metadata.
Batasan
Kluster EMR V3.46.0 tidak mendukung penggunaan katalog DLF dan katalog Hive untuk berinteraksi dengan Paimon.
Hanya kluster dengan versi EMR mulai dari V3.46.0 hingga V3.50.X atau V5.12.0 hingga V5.16.X yang mendukung penggunaan klien Flink SQL untuk berinteraksi dengan Paimon.
CatatanUntuk kluster EMR V3.51.X atau versi minor terbaru, serta kluster EMR V5.17.X atau versi minor terbaru, Anda dapat mengonfigurasi dependensi sesuai kebutuhan bisnis. Untuk informasi lebih lanjut, lihat Memulai Cepat.
Prosedur
Langkah 1: Konfigurasikan dependensi
Topik ini menjelaskan cara menggunakan katalog sistem file, katalog Hive, dan katalog DLF pada klien Flink SQL untuk berinteraksi dengan Paimon. Pilih jenis katalog sesuai skenario dan persyaratan lingkungan Anda, lalu konfigurasikan dependensi berdasarkan tipe katalog.
Katalog Filesystem
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/Katalog Hive
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/Katalog DLF
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/Langkah 2: Mulai kluster EMR
Contoh ini menggunakan kluster EMR dalam mode sesi. Untuk informasi tentang mode lainnya, lihat Penggunaan Dasar.
Jalankan perintah berikut untuk memulai sesi YARN:
yarn-session.sh --detachedLangkah 3: Buat katalog
Paimon menyimpan data dan metadata dalam sistem file seperti Hadoop Distributed File System (HDFS) atau layanan penyimpanan objek seperti OSS-HDFS. Jalur root untuk penyimpanan ditentukan oleh parameter warehouse. Jika jalur root yang ditentukan tidak ada, jalur tersebut akan dibuat secara otomatis. Jika jalur root sudah ada, Anda dapat menggunakan katalog yang dibuat untuk mengakses tabel yang ada di jalur tersebut.
Anda dapat menyinkronkan metadata ke Hive atau DLF sehingga layanan lain dapat mengakses data Paimon melalui Hive atau DLF.
Kluster EMR V3.46.0 dan EMR V5.17.0 tidak mendukung penggunaan katalog DLF dan katalog Hive untuk berinteraksi dengan Paimon.
Buat katalog sistem file
Katalog sistem file menyimpan metadata dalam sistem file atau sistem penyimpanan objek.
Jalankan perintah berikut untuk memulai klien Flink SQL:
sql-client.shEksekusi pernyataan Flink SQL berikut untuk membuat katalog sistem file:
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Buat katalog Hive
Katalog Hive dapat menyinkronkan metadata ke Hive Metastore. Hive memungkinkan Anda menanyakan data dalam tabel yang dibuat di katalog Hive.
Untuk informasi tentang cara menanyakan data Paimon di Hive, lihat Integrasi Paimon dengan Hive.
Jalankan perintah berikut untuk memulai klien Flink SQL:
sql-client.shCatatanBahkan jika Anda menggunakan Hive 3, Anda tidak perlu memodifikasi perintah startup.
Eksekusi pernyataan Flink SQL berikut untuk membuat katalog Hive:
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://master-1-1:9083', -- Alamat Hive Metastore. 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Buat katalog DLF
Katalog DLF dapat menyinkronkan metadata ke DLF.
Saat membuat kluster EMR, pilih DLF Unified Metadata untuk Metadata.
Jalankan perintah berikut untuk memulai klien Flink SQL:
sql-client.shCatatanBahkan jika Anda menggunakan Hive 3, Anda tidak perlu memodifikasi perintah startup.
Eksekusi pernyataan Flink SQL berikut untuk membuat katalog DLF:
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'hive-conf-dir' = '/etc/taihao-apps/flink-conf', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Langkah 4: Baca data dari dan tulis data ke Paimon dalam mode streaming
Eksekusi pernyataan Flink SQL berikut untuk membuat tabel Paimon di katalog yang dibuat dan baca serta tulis data ke tabel tersebut:
-- Setel parameter execution.runtime-mode ke streaming.
SET 'execution.runtime-mode' = 'streaming';
-- Tentukan interval checkpoint untuk Paimon.
SET 'execution.checkpointing.interval' = '10s';
-- Gunakan katalog yang dibuat pada langkah sebelumnya.
USE CATALOG test_catalog;
-- Buat database uji dan gunakan database tersebut.
CREATE DATABASE test_db;
USE test_db;
-- Buat tabel sumber Datagen yang menghasilkan data acak.
CREATE TEMPORARY TABLE datagen_source (
uuid int,
kind int,
price int
) WITH (
'connector' = 'datagen',
'fields.kind.min' = '0',
'fields.kind.max' = '9',
'rows-per-second' = '10'
);
-- Buat tabel Paimon.
CREATE TABLE test_tbl (
uuid int,
kind int,
price int,
PRIMARY KEY (uuid) NOT ENFORCED
);
-- Tulis data ke tabel Paimon.
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- Baca data dari tabel.
-- Operasi penulisan sedang berlangsung saat operasi pembacaan dilakukan.
-- Pastikan bahwa kluster memiliki sumber daya yang cukup (slot tugas) untuk melakukan operasi penulisan dan pembacaan secara bersamaan. Jika tidak, data gagal dibaca.
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;Langkah 5: Lakukan kueri OLAP pada Paimon
Eksekusi pernyataan Flink SQL berikut untuk melakukan kueri pemrosesan analitik online (OLAP) pada tabel Paimon:
-- Setel parameter execution.runtime-mode ke batch.
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- Gunakan mode tableau untuk menampilkan hasil kueri di CLI.
SET 'sql-client.execution.result-mode' = 'tableau';
-- Kueri data dari tabel Paimon.
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;Langkah 6: Bersihkan sumber daya
Setelah pengujian selesai, hentikan operasi penulisan untuk mencegah kebocoran sumber daya.
Eksekusi pernyataan Flink SQL berikut untuk menghapus tabel Paimon:
DROP TABLE test_tbl;