Lindorm流引擎提供了100%相容Flink SQL的能力。您可以將未經處理資料儲存在Kafka Topic,並通過Flink SQL在流引擎中建立Realtime Compute任務,對未經處理資料進行高效計算和處理。本文介紹如何使用Flink SQL提交流引擎計算任務將Kafka Topic中的資料匯入至Lindorm寬表。
前提條件
注意事項
如果您的應用部署在ECS執行個體,且想要通過專用網路訪問Lindorm執行個體,則需要確保Lindorm執行個體和ECS執行個體滿足以下條件,以保證網路的連通性。
所在地區相同,並建議所在可用性區域相同(以減少網路延時)。
ECS執行個體與Lindorm執行個體屬於同一專用網路。
操作步驟
步驟一:資料準備
通過Kafka API將來源資料寫入Kafka Topic。共支援以下兩種寫入方式:
以通過開源Kafka指令碼工具寫入資料為例。
#建立Topic ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create #寫入資料 ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"} {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}Lindorm Stream Kafka地址的擷取方式請參見查看串連地址。
在寬表引擎中建立結果表,用於儲存計算任務的處理結果。
通過Lindorm-cli串連寬表引擎。如何串連,請參見通過Lindorm-cli串連並使用寬表引擎。
建立結果表
log。CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, timestamp BIGINT, primary key (loglevel, thread) );
步驟二:安裝流引擎用戶端
在ECS上執行以下命令,下載流引擎用戶端壓縮包。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz執行以下命令,解壓壓縮包。
tar zxvf lindorm-sqlline-2.0.2.tar.gz進入
lindorm-sqlline-2.0.2/bin目錄,執行以下命令串連至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>Lindorm Stream SQL地址的擷取方式,請參見查看串連地址。
步驟三:在流引擎中提交計算任務
樣本計算任務的具體實現如下:
建立名為log_to_lindorm的Flink Job,並在Flink Job中建立兩張表:originalData(Source表)和lindorm_log_table(Sink表),分別關聯已建立的Kafka Topic和結果表log。
建立流任務,過濾掉loglevel為ERROR的日誌,將過濾結果寫入結果表log中。
具體代碼如下:
CREATE FJOB log_to_lindorm(
--Kafka Source表
CREATE TABLE originalData(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT
)WITH(
'connector'='kafka',
'topic'='log_topic',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka地址',
'format'='json'
);
--Lindorm寬表 Sink表
CREATE TABLE lindorm_log_table(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT,
PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
)WITH(
'connector'='lindorm',
'seedServer'='Lindorm寬表引擎的HBase相容地址',
'userName'='root',
'password'='test',
'tableName'='log',
'namespace'='default'
);
--過濾Kafka中的ERROR日誌,將結果寫入Lindorm寬表
INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);Lindorm寬表引擎的HBase相容地址的擷取方式,請參見查看串連地址。
寬表連接器(Connector)的詳細說明,請參見配置流引擎的寬表連接器。
步驟四:查詢流引擎處理結果
支援以下兩種查詢方式:
通過Lindorm-cli串連寬表引擎並執行以下命令查詢處理結果。
SELECT * FROM log LIMIT 5;返回結果:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+通過寬表引擎的叢集管理系統查詢處理結果,具體操作請參見資料查詢。