Mesin streaming Lindorm sepenuhnya kompatibel dengan Flink SQL. Anda dapat menggunakan Flink SQL untuk membuat tugas komputasi real-time di mesin streaming Lindorm guna memproses data mentah yang disimpan dalam topik Apache Kafka secara efisien. Topik ini menjelaskan cara menggunakan Flink SQL untuk mengirimkan tugas komputasi guna mengimpor data dari topik Apache Kafka ke tabel lebar Lindorm.
Prasyarat
Mesin streaming Lindorm telah diaktifkan untuk instance Lindorm Anda. Untuk informasi lebih lanjut, lihat Aktifkan Mesin Streaming.
Alamat IP klien Anda telah ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.
Catatan penggunaan
- Instance Lindorm dan ECS Anda diterapkan di wilayah yang sama. Kami sarankan Anda menerapkan kedua instance di zona yang sama untuk mengurangi latensi jaringan.
- Instance Lindorm dan ECS Anda diterapkan di VPC yang sama.
Prosedur
Langkah 1: Persiapkan data
Gunakan API Kafka untuk menulis data yang ingin diproses ke topik Kafka. Anda dapat menggunakan salah satu metode berikut untuk menulis data:
Gunakan Klien Apache Kafka Sumber Terbuka untuk Menulis Data ke Mesin Streaming Lindorm
Gunakan Alat Skrip Apache Kafka Sumber Terbuka untuk Menulis Data ke Mesin Streaming Lindorm
Dalam topik ini, alat skrip Kafka sumber terbuka digunakan untuk menulis data sebagai contoh.
# Buat topik. ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic --create # Tulis data ke topik. ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"} {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}Untuk informasi lebih lanjut tentang cara melihat Lindorm Stream Kafka endpoint, lihat Lihat Titik Akhir.
Buat tabel hasil di LindormTable untuk menyimpan hasil pemrosesan.
Gunakan Lindorm-cli untuk terhubung ke LindormTable. Untuk informasi lebih lanjut, lihat Gunakan Lindorm-cli untuk Terhubung dan Menggunakan LindormTable.
Buat tabel hasil bernama
log.CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, timestamp BIGINT, primary key (loglevel, thread) );
Langkah 2: Instal klien mesin streaming Lindorm
Jalankan perintah berikut pada instance ECS untuk mengunduh paket klien mesin streaming Lindorm:
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gzJalankan perintah berikut untuk mengekstrak paket:
tar zxvf lindorm-sqlline-2.0.2.tar.gzPergi ke jalur
lindorm-sqlline-2.0.2/bin, lalu jalankan perintah berikut untuk terhubung ke mesin streaming Lindorm:./lindorm-sqlline -url <Lindorm Stream SQL Endpoint>Untuk informasi lebih lanjut tentang cara melihat Lindorm Stream SQL endpoint, lihat Lihat Titik Akhir.
Langkah 3: Kirim tugas komputasi di mesin streaming Lindorm
Dalam contoh yang dijelaskan pada langkah ini, operasi berikut dilakukan:
Buat pekerjaan Flink bernama log_to_lindorm dan buat dua tabel bernama originalData dan lindorm_log_table. Tabel originalData adalah tabel sumber yang terkait dengan topik Kafka. Tabel lindorm_log_table adalah tabel sink yang menyimpan log hasil.
Buat tugas stream untuk menyaring log dengan loglevel ERROR dan tulis log tersebut ke tabel hasil.
Contoh kode:
CREATE FJOB log_to_lindorm(
--Buat tabel sumber Kafka.
CREATE TABLE originalData(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT
)WITH(
'connector'='kafka',
'topic'='log_topic',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka Endpoint',
'format'='json'
);
-- Buat tabel lebar Lindorm.
CREATE TABLE lindorm_log_table(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT,
PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
)WITH(
'connector'='lindorm',
'seedServer'='Titik akhir LindormTable untuk API HBase',
'userName'='root',
'password'='test',
'tableName'='log',
'namespace'='default'
);
--Filter log ERROR dari data di topik Kafka dan tulis log tersebut ke tabel lebar hasil.
INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);Untuk informasi lebih lanjut tentang cara melihat titik akhir LindormTable untuk API HBase, lihat Lihat Titik Akhir.
Untuk informasi lebih lanjut tentang konektor yang digunakan untuk terhubung ke tabel lebar, lihat Konfigurasikan Konektor Tabel Lebar untuk Mesin Streaming Lindorm.
Langkah 4: Kueri hasil pemrosesan
Anda dapat menggunakan salah satu metode berikut untuk mengkueri hasil pemrosesan:
Gunakan Lindorm-cli untuk terhubung ke LindormTable dan jalankan perintah berikut untuk mengkueri hasil pemrosesan:
SELECT * FROM log LIMIT 5;Hasil berikut dikembalikan:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+Gunakan sistem manajemen kluster LindormTable untuk mengkueri hasil pemrosesan. Untuk informasi lebih lanjut, lihat Kueri Data.