全部产品
Search
文档中心

:Gunakan Kafka untuk menulis data ke mesin streaming Lindorm

更新时间:Nov 09, 2025

Mesin streaming Lindorm sepenuhnya kompatibel dengan Flink SQL. Anda dapat menggunakan Flink SQL untuk membuat tugas komputasi real-time di mesin streaming Lindorm guna memproses data mentah yang disimpan dalam topik Apache Kafka secara efisien. Topik ini menjelaskan cara menggunakan Flink SQL untuk mengirimkan tugas komputasi guna mengimpor data dari topik Apache Kafka ke tabel lebar Lindorm.

Prasyarat

  • Mesin streaming Lindorm telah diaktifkan untuk instance Lindorm Anda. Untuk informasi lebih lanjut, lihat Aktifkan Mesin Streaming.

  • Alamat IP klien Anda telah ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut, lihat Konfigurasi Daftar Putih.

Catatan penggunaan

Jika aplikasi Anda diterapkan pada instance Elastic Compute Service (ECS) dan perlu terhubung ke instance Lindorm melalui VPC, pastikan bahwa instance Lindorm dan instance ECS memenuhi persyaratan berikut untuk memastikan konektivitas jaringan:
  • Instance Lindorm dan ECS Anda diterapkan di wilayah yang sama. Kami sarankan Anda menerapkan kedua instance di zona yang sama untuk mengurangi latensi jaringan.
  • Instance Lindorm dan ECS Anda diterapkan di VPC yang sama.

Prosedur

Langkah 1: Persiapkan data

  1. Gunakan API Kafka untuk menulis data yang ingin diproses ke topik Kafka. Anda dapat menggunakan salah satu metode berikut untuk menulis data:

    Dalam topik ini, alat skrip Kafka sumber terbuka digunakan untuk menulis data sebagai contoh.

    # Buat topik.
    ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic --create
    
    # Tulis data ke topik.
    ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka Endpoint> --topic log_topic
    {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
    {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}

    Untuk informasi lebih lanjut tentang cara melihat Lindorm Stream Kafka endpoint, lihat Lihat Titik Akhir.

  2. Buat tabel hasil di LindormTable untuk menyimpan hasil pemrosesan.

    1. Gunakan Lindorm-cli untuk terhubung ke LindormTable. Untuk informasi lebih lanjut, lihat Gunakan Lindorm-cli untuk Terhubung dan Menggunakan LindormTable.

    2. Buat tabel hasil bernama log.

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );

Langkah 2: Instal klien mesin streaming Lindorm

  1. Jalankan perintah berikut pada instance ECS untuk mengunduh paket klien mesin streaming Lindorm:

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
  2. Jalankan perintah berikut untuk mengekstrak paket:

    tar zxvf lindorm-sqlline-2.0.2.tar.gz
  3. Pergi ke jalur lindorm-sqlline-2.0.2/bin, lalu jalankan perintah berikut untuk terhubung ke mesin streaming Lindorm:

    ./lindorm-sqlline -url <Lindorm Stream SQL Endpoint>

    Untuk informasi lebih lanjut tentang cara melihat Lindorm Stream SQL endpoint, lihat Lihat Titik Akhir.

Langkah 3: Kirim tugas komputasi di mesin streaming Lindorm

Dalam contoh yang dijelaskan pada langkah ini, operasi berikut dilakukan:

  1. Buat pekerjaan Flink bernama log_to_lindorm dan buat dua tabel bernama originalData dan lindorm_log_table. Tabel originalData adalah tabel sumber yang terkait dengan topik Kafka. Tabel lindorm_log_table adalah tabel sink yang menyimpan log hasil.

  2. Buat tugas stream untuk menyaring log dengan loglevel ERROR dan tulis log tersebut ke tabel hasil.

Contoh kode:

CREATE FJOB log_to_lindorm(
    --Buat tabel sumber Kafka.
    CREATE TABLE originalData(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT
    )WITH(
        'connector'='kafka',
        'topic'='log_topic',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='Lindorm Stream Kafka Endpoint',
        'format'='json'
    );
    -- Buat tabel lebar Lindorm.
    CREATE TABLE lindorm_log_table(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT,
        PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
    )WITH(
        'connector'='lindorm',
        'seedServer'='Titik akhir LindormTable untuk API HBase',
        'userName'='root',
        'password'='test',
        'tableName'='log',
        'namespace'='default'
    );
    --Filter log ERROR dari data di topik Kafka dan tulis log tersebut ke tabel lebar hasil.
    INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);
Catatan

Langkah 4: Kueri hasil pemrosesan

Anda dapat menggunakan salah satu metode berikut untuk mengkueri hasil pemrosesan:

  • Gunakan Lindorm-cli untuk terhubung ke LindormTable dan jalankan perintah berikut untuk mengkueri hasil pemrosesan:

    SELECT * FROM log LIMIT 5;

    Hasil berikut dikembalikan:

    +----------+----------+-------------------------+-----------------------+---------------+
    | loglevel |  thread  |          class          |        detail         |   timestamp   |
    +----------+----------+-------------------------+-----------------------+---------------+
    | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
    | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
    +----------+----------+-------------------------+-----------------------+---------------+                                     
  • Gunakan sistem manajemen kluster LindormTable untuk mengkueri hasil pemrosesan. Untuk informasi lebih lanjut, lihat Kueri Data.