Routine Load是一種例行匯入方式,StarRocks通過該方式支援從Kafka持續不斷的匯入資料,並且支援通過SQL控制匯入任務的暫停、重啟和停止。本文為您介紹Routine Load匯入的基本原理、匯入樣本以及常見問題。
基本概念
- RoutineLoadJob:提交的一個例行匯入任務。
- JobScheduler:例行匯入任務調度器,用於調度和拆分一個RoutineLoadJob為多個Task。
- Task:RoutineLoadJob被JobScheduler根據規則拆分的子任務。
- TaskScheduler:任務調度器,用於調度Task的執行。
基本原理

- 使用者通過支援MySQL協議的用戶端向FE提交一個Kafka匯入任務。
- FE將一個匯入任務拆分成若干個Task,每個Task負責匯入指定的一部分資料。
- 每個Task被分配到指定的BE上執行。在BE上,一個Task被視為一個普通的匯入任務,通過Stream Load的匯入機制進行匯入。
- BE匯入完成後,向FE彙報。
- FE根據彙報結果,繼續產生後續新的Task,或者對失敗的Task進行重試。
- FE會不斷的產生新的Task,來完成資料不間斷的匯入。
匯入流程
環境要求
支援訪問無認證或使用SSL方式認證的Kafka叢集。
支援的訊息格式如下:
CSV文字格式設定,每一個message為一行,且行尾不包含分行符號。
JSON文字格式設定。
不支援Array類型。
僅支援Kafka 0.10.0.0及以上版本。
建立匯入任務
文法
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name> [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]相關參數描述如下表所示。
參數 是否必填 描述 job_name 是 匯入任務的名稱,首碼可以攜帶匯入資料庫名稱,常見命名方式為時間戳記+表名。 一個DataBase內,任務名稱不可重複。 table_name 是 匯入的目標表的名稱。 COLUMNS TERMINATED子句 否 指定來源資料檔案中的資料行分隔符號,分隔字元預設為\t。 COLUMNS子句 否 指定來源資料中列和表中列的映射關係。 - 映射列:例如,目標表有三列col1、col2和col3,來源資料有4列,其中第1、2、4列分別對應col2、col1和col3,則書寫為
COLUMNS (col2, col1, temp, col3),其中temp列為不存在的一列,用於跳過來源資料中的第三列。 - 衍生列:除了直接讀取來源資料的列內容之外,StarRocks還提供對資料列的加工操作。例如,目標表後加入了第四列col4,其結果由col1 + col2產生,則可以書寫為
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)。
WHERE子句 否 指定過濾條件,可以過濾掉不需要的行。過濾條件可以指定映射列或衍生列。 例如,只匯入k1大於100並且k2等於1000的行,則書寫為
WHERE k1 > 100 and k2 = 1000。PARTITION子句 否 指定匯入目標表的Partition。如果不指定,則會自動匯入到對應的Partition中。 PROPERTIES子句 否 指定匯入任務的通用參數。 desired_concurrent_number 否 匯入並發度,指定一個匯入任務最多會被分成多少個子任務執行。必須大於0,預設值為3。 max_batch_interval 否 每個子任務的最大執行時間。範圍為5~60,單位是秒。預設值為10。 1.15版本後,該參數表示子任務的調度時間,即任務多久執行一次。任務的消費資料時間為fe.conf中的routine_load_task_consume_second,預設為3s。任務的執行逾時時間為fe.conf中的routine_load_task_timeout_second,預設為15s。
max_batch_rows 否 每個子任務最多讀取的行數。必須大於等於200000。預設值為200000。 1.15版本後,該參數只用於定義錯誤偵測視窗範圍,視窗的範圍是10 * max-batch-rows。
max_batch_size 否 每個子任務最多讀取的位元組數。單位為位元組,範圍是100 MB到1 GB。預設值為100 MB。 1.15版本後,廢棄該參數,任務消費資料的時間為fe.conf中的routine_load_task_consume_second,預設為3s。
max_error_number 否 採樣視窗內,允許的最大錯誤行數。必須大於等於0。預設是0,即不允許有錯誤行。 重要 被WHERE條件過濾掉的行不算錯誤行。strict_mode 否 是否開啟strict 模式,預設為開啟。 如果開啟後,非空未經處理資料的列類型變換為NULL,則會被過濾。關閉方式為設定該參數為false。
timezone 否 指定匯入任務所使用的時區。 預設為使用Session的timezone參數。該參數會影響所有匯入涉及的和時區有關的函數結果。
DATA_SOURCE 是 指定資料來源,請使用KAFKA。 data_source_properties 否 指定資料來源相關的資訊。包括以下參數: - kafka_broker_list:Kafka的Broker串連資訊,格式為
ip:host。多個Broker之間以逗號(,)分隔。 - kafka_topic:指定待訂閱的Kafka的Topic。說明 如果指定資料來源相關的資訊,則kafka_broker_list和kafka_topic必填。
- kafka_partitions和kafka_offsets:指定需要訂閱的Kafka Partition,以及對應的每個Partition的起始offset。
- property:Kafka相關的屬性,功能等同於Kafka Shell中
"--property"參數。建立匯入任務更詳細的文法可以通過執行HELP ROUTINE LOAD;命令查看。
- 映射列:例如,目標表有三列col1、col2和col3,來源資料有4列,其中第1、2、4列分別對應col2、col1和col3,則書寫為
樣本:向StarRocks提交一個無認證的Routine Load匯入任務example_tbl2_ordertest,持續消費Kafka叢集中Topic ordertest2的訊息,並匯入至example_tbl2表中,匯入任務會從此Topic所指定分區的最早位點開始消費。
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="5", "format" ="json", "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]" ) FROM KAFKA ( "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>", "kafka_topic" = "ordertest2", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );樣本:使用SSL安全性通訊協定訪問Kafka,具體的配置樣本如下。
-- 指定安全性通訊協定為SSL。 "property.security.protocol" = "ssl", -- CA認證的位置。 "property.ssl.ca.location" = "FILE:ca-cert", -- 如果Kafka Server端開啟了Client認證,則還需設定以下三個參數: -- Client的公開金鑰位置。 "property.ssl.certificate.location" = "FILE:client.pem", -- Client的私密金鑰位置。 "property.ssl.key.location" = "FILE:client.key", -- Client私密金鑰的密碼。 "property.ssl.key.password" = "******"關於建立檔案的詳細資料,請參見CREATE FILE。
說明在使用CREATE FILE時,請使用OSS的HTTP訪問地址作為
url。具體的使用方式,請參見通過IPv6協議訪問OSS。
查看任務狀態
顯示load_test下,所有的例行匯入任務(包括已停止或取消的任務)。結果為一行或多行。
USE load_test; SHOW ALL ROUTINE LOAD;顯示load_test下,名稱為example_tbl2_ordertest的當前正在啟動並執行例行匯入任務。
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;在EMR StarRocks Manager控制台,單擊中繼資料管理,單擊待查看的資料庫名稱,單擊任務,即可在Kafka匯入頁簽查看任務的執行狀態。
StarRocks只能查看當前正在運行中的任務,已結束和未開始的任務無法查看。
執行SHOW ALL ROUTINE LOAD命令,可以查看當前正在啟動並執行所有Routine Load任務,返回如下類似資訊。
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)本樣本建立名為routine_load_wikipedia的匯入任務,相關參數描述如下表。
參數 | 描述 |
State | 匯入任務狀態。RUNNING表示該匯入任務處於持續運行中。 |
Statistic | 進度資訊,記錄了從建立任務開始後的匯入資訊。 |
receivedBytes | 接收到的資料大小,單位是Byte。 |
errorRows | 匯入錯誤行數。 |
committedTaskNum | FE提交的Task數。 |
loadedRows | 已匯入的行數。 |
loadRowsRate | 匯入資料速率,單位是行每秒(row/s)。 |
abortedTaskNum | BE失敗的Task數。 |
totalRows | 接收的總行數。 |
unselectedRows | 被WHERE條件過濾的行數。 |
receivedBytesRate | 接收資料速率,單位是Bytes/s。 |
taskExecuteTimeMs | 匯入耗時,單位是ms。 |
ErrorLogUrls | 錯誤資訊日誌,可以通過URL看到匯入處理程序中的錯誤資訊。 |
暫停匯入任務
使用PAUSE語句後,此時匯入任務進入PAUSED狀態,資料暫停匯入,但任務未終止,可以通過RESUME語句重啟任務。
PAUSE ROUTINE LOAD FOR <job_name>;暫停匯入任務後,任務的State變更為PAUSED,Statistic和Progress中的匯入資訊停止更新。此時,任務並未終止,通過SHOW ROUTINE LOAD語句可以看到已經暫停匯入任務。
恢複匯入任務
使用RESUME語句後,任務會短暫的進入NEED_SCHEDULE狀態,表示任務正在重新調度,一段時間後會重新恢複至RUNNING狀態,繼續匯入資料。
RESUME ROUTINE LOAD FOR <job_name>;停止匯入任務
使用STOP語句讓匯入任務進入STOP狀態,資料停止匯入,任務終止,無法恢複資料匯入。
STOP ROUTINE LOAD FOR <job_name>;停止匯入任務後,任務的State變更為STOPPED,Statistic和Progress中的匯入資訊再也不會更新。此時,通過SHOW ROUTINE LOAD語句無法看到已經停止的匯入任務。
最佳實務案例
本案例是建立了一個Routine Load匯入任務,持續不斷地消費Kafka叢集的CSV格式的資料,然後匯入至StarRocks中。
在Kafka叢集中執行以下操作。
建立測試的topic。
kafka-topics.sh --create --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"執行如下命令,生產資料。
kafka-console-producer.sh --broker-list core-1-1:9092 --topic order_sr_topic輸入測試資料。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924
在StarRocks叢集中執行以下操作。
執行以下命令,建立目標資料庫和資料表。
根據CSV資料中需要匯入的幾列(例如除第五列性別外的其餘五列需要匯入至StarRocks), 在StarRocks叢集的目標資料庫load_test 中建立表routine_load_tbl_csv。
CREATE TABLE load_test.routine_load_tbl_csv ( `order_id` bigint NOT NULL COMMENT "訂單編號", `pay_dt` date NOT NULL COMMENT "支付日期", `customer_name` varchar(26) NULL COMMENT "顧客姓名", `nationality` varchar(26) NULL COMMENT "國籍", `price` double NULL COMMENT "支付金額" ) ENGINE=OLAP PRIMARY KEY (order_id,pay_dt) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;執行以下命令,建立匯入任務。
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv COLUMNS TERMINATED BY ",", COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price) PROPERTIES ( "desired_concurrent_number" = "5" ) FROM KAFKA ( "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "order_sr_topic", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" )執行以下命令,查看名稱為routine_load_tbl_ordertest_csv的匯入任務的資訊。
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;如果狀態為RUNNING,則說明作業運行正常。
執行以下命令,查詢目標表中的資料,您會探索資料已經同步完成。
SELECT * FROM routine_load_tbl_csv;您還可以任務進行以下操作:
暫停匯入任務
PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;恢複匯入任務
RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;修改匯入任務
說明僅支援修改狀態為PAUSED的任務。
例如:修改desired_concurrent_number為6。
ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv PROPERTIES ( "desired_concurrent_number" = "6" )停止匯入任務
STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;