例行匯入(Routine Load)功能,支援使用者提交一個常駐的匯入任務,通過不斷地從指定的資料來源讀取資料,將資料匯入到Doris中。本文主要介紹Routine Load功能的實現原理、使用方式以及最佳實務。
使用限制
當前僅支援從Kafka進行例行匯入。
支援無認證的Kafka訪問,以及通過SSL方式認證的Kafka叢集。
支援的訊息格式為CSV或JSON文字格式設定。CSV每一個message為一行,且行尾不包含分行符號。
預設支援Kafka 0.10.0.0及以上版本。如果要使用Kafka 0.10.0.0以下版本 (0.9.0、0.8.2、0.8.1、0.8.0),需要修改BE配置,將 kafka_broker_version_fallback參數值設定為要相容的舊版本,或者在建立Routine Load時直接設定property.broker.version.fallback參數值為要相容的舊版本。
說明使用舊版本可能會導致Routine Load的部分新特性無法使用,例如根據時間設定Kafka分區的offset。
基本原理
Client向FE提交一個Routine Load作業的原理如下:
+---------+
| Client |
+----+----+
|
+-----------------------------+
| FE | |
| +-----------v------------+ |
| | | |
| | Routine Load Job | |
| | | |
| +---+--------+--------+--+ |
| | | | |
| +---v--+ +---v--+ +---v--+ |
| | task | | task | | task | |
| +--+---+ +---+--+ +---+--+ |
| | | | |
+-----------------------------+
| | |
v v v
+---+--+ +--+---+ ++-----+
| BE | | BE | | BE |
+------+ +------+ +------+FE通過JobScheduler將一個匯入作業拆分成若干個Task。每個Task負責匯入指定的一部分資料。Task被TaskScheduler分配到指定的BE 上執行。
在BE上,一個Task被視為一個普通的匯入任務,通過Stream Load的匯入機制進行匯入。匯入完成後,向FE彙報。
FE中的JobScheduler根據彙報結果,繼續產生後續新的Task,或者對失敗的Task進行重試。
整個Routine Load作業通過不斷地產生新的Task,來完成資料不間斷的匯入。
Kafka例行匯入
下面詳細介紹Kafka例行匯入的使用方式和最佳實務。
建立任務
建立例行匯入任務的詳細文法可以查看CREATE ROUTINE LOAD命令手冊或執行HELP ROUTINE LOAD; 查看文法協助。下面以幾個樣本說明如何建立Routine Load任務。
為example_db的example_tbl建立一個名為test1的Kafka例行匯入任務。指定資料行分隔符號和group.id和client.id,並且自動預設消費所有分區,且從有資料的位置(OFFSET_BEGINNING)開始訂閱。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );以strict 模式為example_db的example_tbl建立一個名為test1的Kafka例行匯入任務。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), WHERE k1 > 100 and k2 like "%doris%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );匯入JSON格式資料使用樣本。
Routine Load匯入的JSON格式僅支援以下兩種:
第一種為只有一條記錄,且為JSON對象。
{"category":"a9jadhx","author":"test","price":895}第二種為JSON數組,數組中可含多條記錄。
[ { "category":"11", "author":"4avc", "price":895, "timestamp":1589191587 }, { "category":"22", "author":"2avc", "price":895, "timestamp":1589191487 }, { "category":"33", "author":"3avc", "price":342, "timestamp":1589191387 } ]建立待匯入的Doris資料表,樣本如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20200509")), PARTITION p20200509 VALUES [("20200509"), ("20200510")), PARTITION p20200510 VALUES [("20200510"), ("20200511")), PARTITION p20200511 VALUES [("20200511"), ("20200512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4 PROPERTIES ( "replication_num" = "1" );以簡單模式匯入JSON資料,樣本如下。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );精準匯入JSON格式資料,樣本如下。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );說明表中的分區欄位dt在樣本的資料裡並沒有,而是在Routine load語句裡通過
dt=from_unixtime(timestamp, '%Y%m%d')轉換得來的。strict mode與source data的匯入關係
列類型為TinyInt,且表中的列允許匯入空值時:
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa or 2000
NULL
true
invalid data(filtered)
not null
aaa
NULL
true
NULL
not null
1
1
true or false
correct data
列類型為 Decimal(1,0),且表中的列允許匯入空值時:
source data
source data example
string to int
strict_mode
result
空值
\N
N/A
true or false
NULL
not null
aaa
NULL
true
invalid data(filtered)
not null
aaa
NULL
false
NULL
not null
1 or 10
1
true or false
correct data
說明10雖然是一個超過範圍的值,但是因為其類型符合decimal的要求,所以strict mode對其不產生影響。10最後會在其他ETL處理流程中被過濾,但不會被strict mode過濾。
訪問SSL認證的Kafka叢集
訪問SSL認證的Kafka叢集需要您提供用於認證Kafka Broker公開金鑰的認證檔案(ca.pem)。如果Kafka叢集同時開啟了用戶端認證,則還需提供用戶端的公開金鑰(client.pem)、密鑰檔案(client.key)、以及密鑰密碼。檔案需要先通過CREATE FILE命令上傳到Doris中,並且catalog名稱為kafka。CREATE FILE命令詳情可使用
HELP CREATE FILE;查看。樣本如下。上傳檔案。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");建立例行匯入作業。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcd***" );
Doris通過Kafka的C++ API librdkafka來訪問Kafka叢集,librdkafka所支援的參數詳情請參見librdkafka。
查看作業狀態
查看作業狀態的具體命令和樣本可以通過HELP SHOW ROUTINE LOAD;命令查看。
查看任務運行狀態的具體命令和樣本可以通過HELP SHOW ROUTINE LOAD TASK; 命令查看。
只能查看當前正在運行中的任務,已結束和未開始的任務無法查看。
修改作業屬性
您可以使用ALTER ROUTINE LOAD來修改已經建立的作業,或者通過HELP ALTER ROUTINE LOAD; 命令查看關於此命令的協助文檔。
作業控制
您可以通過STOP、PAUSE或RESUME三個命令來控製作業的停止、暫停和重啟。可以通過HELP STOP ROUTINE LOAD; 、HELP PAUSE ROUTINE LOAD;以及 HELP RESUME ROUTINE LOAD; 三個命令查看協助和樣本。
其他說明
例行匯入作業和ALTER TABLE操作的關係
例行匯入不會阻塞SCHEMA CHANGE和ROLLUP操作。
但需注意的是,如果SCHEMA CHANGE完成後,列映射關係無法匹配,則會導致作業的錯誤資料激增,最終導致作業暫停。建議通過在例行匯入作業中顯式指定列映射關係,以及通過增加Nullable列或帶Default值的列來減少這類問題。
刪除表的Partition可能會導致匯入資料無法找到對應的Partition,使得作業暫停。
例行匯入作業和其他匯入作業的關係(LOAD、DELETE、INSERT)
例行匯入和其他LOAD作業以及INSERT操作沒有衝突。
當執行DELETE操作時,對應表分區不能有任何正在執行的匯入任務。所以在執行DELETE操作前,需要先暫停例行匯入作業,並等待已下發的task全部完成後,才可以執行DELETE。
例行匯入作業和DROP DATABASE/TABLE操作的關係:當例行匯入對應的DATABASE或TABLE被刪除後,作業會自動CANCEL。
Kafka類型的例行匯入作業和Kafka topic的關係
當您在建立例行匯入聲明的kafka_topic在Kafka叢集中不存在時:
如果您的Kafka叢集的broker設定了auto.create.topics.enable = true,則kafka_topic會先被自動建立,自動建立的partition個數是由您的Kafka叢集中的broker配置num.partitions決定的。例行作業會正常的不斷讀取該topic的資料。
如果您的Kafka叢集的broker設定了auto.create.topics.enable = false, 則kafka_topic不會被自動建立,例行作業會在沒有讀取任何資料之前就被暫停,狀態為PAUSED。
因此,如果您希望當Kafka topic不存在的時候,被例行作業可以自動建立,只需要將您的kafka叢集中的broker設定auto.create.topics.enable = true即可。
在網路隔離的環境中可能出現的問題
在有些環境中存在網段和網域名稱解析的隔離措施,所以需要注意:
建立Routine load任務中指定的Broker list必須能夠被Doris服務訪問。
Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必須能夠被Doris服務訪問。
關於指定消費的Partition和Offset
Doris支援指定Partition和Offset開始消費,新版中還支援了指定時間點進行消費的功能。有三個相關參數:
kafka_partitions:指定待消費的partition列表,如:"0, 1, 2, 3"。
kafka_offsets:指定每個分區的起始offset,必須和kafka_partitions列表個數對應,如:"1000, 1000, 2000, 2000"。
property.kafka_default_offset:指定分區預設的起始offset。
在建立匯入作業時,這三個參數可以有以下組合:
組合
kafka_partitions
kafka_offsets
property.kafka_default_offset
行為
1
No
No
No
系統會自動尋找topic對應的所有分區並從 OFFSET_END開始消費。
2
No
No
Yes
系統會自動尋找topic對應的所有分區並從default offset指定的位置開始消費。
3
Yes
No
No
系統會從指定分區的OFFSET_END開始消費。
4
Yes
Yes
No
系統會從指定分區的指定offset處開始消費。
5
Yes
No
Yes
系統會從指定分區,default offset指定的位置開始消費。
STOP和PAUSE的區別
FE會自動定期清理STOP狀態的ROUTINE LOAD,而PAUSE狀態的則可以再次被恢複啟用。
相關參數
一些系統配置參數會影響例行匯入的使用,具體說明如下。
配置項 | FE/BE | 預設值 | 說明 |
max_routine_load_task_concurrent_num | FE | 5 | 該參數限制了一個例行匯入作業最大的子任務並發數。可以運行時修改,建議維持預設值。設定過大,可能導致同時並發的任務數過多,佔用叢集資源。 |
max_routine_load_task_num_per_be | FE | 5 | 該參數限制了每個BE節點最多並發執行的子任務個數。可以運行時修改,建議維持預設值。如果設定過大,可能導致並發任務數過多,佔用叢集資源。 |
max_routine_load_job_num | FE | 100 | 該參數限制了例行匯入作業的總數,包括 NEED_SCHEDULED、RUNNING、PAUSE這些狀態。可以運行時修改。超過後,不能再提交新的作業。 |
max_consumer_num_per_group | BE | 3 | 該參數表示一個子任務中最多產生幾個consumer進行資料消費。對於Kafka資料來源,一個consumer可能消費一個或多個Kafka partition。假設一個任務需要消費6個Kafka partition,則會產生3個 consumer,每個consumer消費2個partition。如果只有2個partition,則只會產生2個consumer,每個 consumer消費1個partition。 |
push_write_mbytes_per_sec | BE | 10,即10MB/s | 該參數是匯入通用參數,不限於例行匯入作業。該參數限制了匯入資料寫入磁碟的速度。對於SSD等高效能存放裝置,可以適當增加這個限速。 |
max_tolerable_backend_down_num | FE | 0 | 在滿足某些條件下,Doris可PAUSED的任務重新調度,即變成RUNNING。該參數為0代表只有所有BE節點是alive狀態才允許重新調度。 |
period_of_auto_resume_min | FE | 5分鐘 | Doris重新調度,只會在5分鐘這個周期內,最多嘗試3次。如果3次都失敗則鎖定當前任務,後續不再進行調度。但可通過人為幹預,進行手動恢複。 |