全部產品
Search
文件中心

:通過Kafka寫入資料

更新時間:Jan 09, 2025

Lindorm流引擎提供了100%相容Flink SQL的能力。您可以將未經處理資料儲存在Kafka Topic,並通過Flink SQL在流引擎中建立Realtime Compute任務,對未經處理資料進行高效計算和處理。本文介紹如何使用Flink SQL提交流引擎計算任務將Kafka Topic中的資料匯入至Lindorm寬表。

前提條件

  • 已開通Lindorm流引擎。如何開通,請參見開通流引擎

  • 已將用戶端IP地址添加至Lindorm執行個體的白名單中。具體操作,請參見設定白名單

注意事項

如果您的應用部署在ECS執行個體,且想要通過專用網路訪問Lindorm執行個體,則需要確保Lindorm執行個體和ECS執行個體滿足以下條件,以保證網路的連通性。

  • 所在地區相同,並建議所在可用性區域相同(以減少網路延時)。

  • ECS執行個體與Lindorm執行個體屬於同一專用網路。

操作步驟

步驟一:資料準備

  1. 通過Kafka API將來源資料寫入Kafka Topic。共支援以下兩種寫入方式:

    以通過開源Kafka指令碼工具寫入資料為例。

    #建立Topic
    ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create
    
    #寫入資料
    ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic
    {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
    {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}

    Lindorm Stream Kafka地址的擷取方式請參見查看串連地址

  2. 在寬表引擎中建立結果表,用於儲存計算任務的處理結果。

    1. 通過Lindorm-cli串連寬表引擎。如何串連,請參見通過Lindorm-cli串連並使用寬表引擎

    2. 建立結果表log

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );

步驟二:安裝流引擎用戶端

  1. 在ECS上執行以下命令,下載流引擎用戶端壓縮包。

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
  2. 執行以下命令,解壓壓縮包。

    tar zxvf lindorm-sqlline-2.0.2.tar.gz
  3. 進入lindorm-sqlline-2.0.2/bin目錄,執行以下命令串連至Lindorm流引擎。

    ./lindorm-sqlline -url <Lindorm Stream SQL地址>

    Lindorm Stream SQL地址的擷取方式,請參見查看串連地址

步驟三:在流引擎中提交計算任務

樣本計算任務的具體實現如下:

  1. 建立名為log_to_lindorm的Flink Job,並在Flink Job中建立兩張表:originalData(Source表)和lindorm_log_table(Sink表),分別關聯已建立的Kafka Topic和結果表log。

  2. 建立流任務,過濾掉loglevel為ERROR的日誌,將過濾結果寫入結果表log中。

具體代碼如下:

CREATE FJOB log_to_lindorm(
    --Kafka Source表
    CREATE TABLE originalData(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT
    )WITH(
        'connector'='kafka',
        'topic'='log_topic',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='Lindorm Stream Kafka地址',
        'format'='json'
    );
    --Lindorm寬表 Sink表
    CREATE TABLE lindorm_log_table(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT,
        PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
    )WITH(
        'connector'='lindorm',
        'seedServer'='Lindorm寬表引擎的HBase相容地址',
        'userName'='root',
        'password'='test',
        'tableName'='log',
        'namespace'='default'
    );
    --過濾Kafka中的ERROR日誌,將結果寫入Lindorm寬表
    INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);
說明

步驟四:查詢流引擎處理結果

支援以下兩種查詢方式:

  • 通過Lindorm-cli串連寬表引擎並執行以下命令查詢處理結果。

    SELECT * FROM log LIMIT 5;

    返回結果:

    +----------+----------+-------------------------+-----------------------+---------------+
    | loglevel |  thread  |          class          |        detail         |   timestamp   |
    +----------+----------+-------------------------+-----------------------+---------------+
    | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
    | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
    +----------+----------+-------------------------+-----------------------+---------------+                                     
  • 通過寬表引擎的叢集管理系統查詢處理結果,具體操作請參見資料查詢