Routine Load支援提交一個常駐的匯入作業,不斷地從指定的資料來源讀取資料,將資料持續地匯入至ApsaraDB for SelectDB中。本文介紹如何通過Routine Load將Kafka中的資料匯入至ApsaraDB for SelectDB執行個體。
前提條件
支援的資料來源:目前僅支援Kafka資料來源,可通過無認證方式或PLAIN/SSL/Kerberos等認證方式串連Kafka。
支援的訊息格式:
CSV或JSON格式。CSV的格式,每一個Message為一行,且行尾不包含分行符號。
注意事項
預設支援Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka相容舊版本,具體操作有以下兩種方式:
將BE的配置
kafka_broker_version_fallback的值設定為要相容的舊版本。建立Routine Load的時候直接設定
property.broker.version.fallback的值為要相容的舊版本。
使用相容舊版本的代價在於,Routine Load的部分新特性可能無法使用,例如根據時間設定Kafka分區的offset。
建立匯入作業
使用Routine Load功能時,首先需建立一個Routine Load作業。該作業將通過例行調度持續發送任務,每個任務會消耗一定數量的Kafka訊息。
文法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]參數說明
參數名稱 | 參數說明 |
| 匯入作業的名稱。在同一個Database內,相同名稱的job只能運行一個。 |
| 指定匯入的表的名稱。 |
| 指定資料合併類型。預設為 |
| 指定匯入資料處理相關參數。詳細參數說明,請參見load_properties。 |
| 指定匯入作業相關參數。詳細參數說明,請參見job_properties參數說明。 |
| 指定資料來源的類型。詳細參數說明,請參見data_source_properties參數說明。 |
load_properties參數說明
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]參數名稱 | 樣本值 | 參數說明 |
| COLUMNS TERMINATED BY "," | 指定資料行分隔符號,預設為 |
| (k1,k2,tmpk1,k3=tmpk1+1) | 指定檔案列和表中列的映射關係,以及各種列轉換等。詳細說明請參見資料轉化。 |
| 無 | 指定過濾未經處理資料條件。詳細說明請參見資料轉化。 |
| WHERE k1>100 and k2=1000 | 指定條件對匯入的資料進行過濾。詳細說明請參見資料轉化。 |
| PARTITION(p1,p2,p3) | 指定匯入目的表的哪些Partition中。如果不指定,則會自動匯入到對應的Partition中。 |
| DELETE ON v3>100 | 用於指定匯入資料中表示Delete Flag的列和計算關係。 說明 需配合MEREGE匯入模式一起使用,僅適用於Unique Key模型的表。 |
| 無 | 用於指定匯入資料中表示Sequence Col的列。其功能為匯入資料時保證資料順序。 說明 僅適用於對Unique Key模型的表。 |
job_properties參數說明
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)這三個參數max_batch_interval、max_batch_rows和max_batch_size用於控制子任務的執行時間和處理量。當任意一個參數達到設定閾值時,任務將終止。
參數名稱 | 樣本值 | 參數說明 |
| "desired_concurrent_number" = "3" | 指定期望並發度。大於0,預設為 說明
|
| "max_batch_interval" = "20" | 指定每個子任務最大執行時間,單位是秒,預設為 |
| "max_batch_rows" = "300000" | 指定每個子任務最多讀取的行數。預設是 |
| "max_batch_size" = "209715200" | 指定每個子任務最多讀取的位元組數。單位是位元組,預設為 |
| "max_error_number"="3" | 指定採樣視窗內,允許的最大錯誤行數。預設為 採樣視窗為 說明 被 |
| "strict_mode"="true" | 指定是否開啟strict 模式,預設為
|
| "timezone" = "Africa/Abidjan" | 指定匯入作業所使用的時區。預設為使用Session的時區作為參數。 說明 該參數會影響所有匯入涉及的和時區有關的函數結果。 |
| "format" = "json" | 指定匯入資料格式,預設為 |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | 當匯入資料格式為 |
| -H "strip_outer_array:true" | 當匯入資料格式為 |
| -H "json_root:$.RECORDS" | 當匯入資料格式為JSON時,可以通過 |
| 無 | 指定設定發送批處理資料的並行度,如果並行度的值超過BE配置中的 |
| 無 | 指定是否只匯入資料到對應分區的一個tablet,預設值為false。該參數只允許向對帶有random分區的Duplicate表匯入資料的時設定。 |
strict 模式(strict mode)與未經處理資料(source data)的匯入關係
例如列類型為TinyInt。當表中的列允許匯入空值時匯入關係如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa or 2000 | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 | 1 | true or false | correct data |
例如列類型為Decimal(1,0)。當表中的列允許匯入空值時匯入關係如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 or 10 | 1 | true or false | correct data |
10雖然是一個超過範圍的值,但是因為其類型符合Decimal的要求,所以strict mode對其不產生影響。10最後會在其他ETL處理流程中被過濾。但不會被strict mode過濾。
data_source_properties參數說明
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)參數名稱 | 參數說明 |
| 指定Kafka的Broker串連資訊。格式為 格式: |
| 指定訂閱的Kafka的Topic。 格式: |
| 指定訂閱的Kafka Partition,以及對應的每個Partition的起始Offset。如果指定時間,則會從大於等於該時間的最近一個Offset處開始消費。 Offset可以指定從大於等於0的具體Offset,或者:
如果沒有指定,則預設從 樣本如下。 重要 時間格式不能和OFFSET格式混用。 |
| 指定自訂Kafka參數。功能等同於Kafka shell中"--property"參數。 當參數的value為一個檔案時,需要在value前加上關鍵詞:" |
Property參數說明
使用SSL串連Kafka時,需要指定以下參數:
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"其中
property.security.protocol和property.ssl.ca.location為必選項,用於指明串連方式為SSL,以及CA認證的位置。如果Kafka Server端開啟了Client認證,則還需設定以下參數。
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"分別用於指定Client的public key、private key、private key的密碼。
指定Kafka Partition的預設起始offset。
沒有指定
kafka_partitions/kafka_offsets,預設消費所有分區。此時可以通過kafka_default_offsets指定起始offset。預設為OFFSET_END,即從末尾開始訂閱。"property.kafka_default_offsets" = "OFFSET_BEGINNING"
更多支援的自訂參數,請參閱librdkafka的官方CONFIGURATION文檔中,Client端的配置項。例如以下參數。
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"使用樣本
建立Routine Load簡單作業
建立待匯入的SelectDB資料表,樣本如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");分別設定不同的參數匯入資料,樣本如下。
為example_db的test_table建立一個名為test1的Kafka Routine Load任務。指定資料行分隔符號的group.id和client.id,設定自動預設消費所有分區,且從有資料的位置(OFFSET_BEGINNING)開始訂閱,樣本如下。
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );為example_db的test_table建立一個名為test2的Kafka Routine Load任務。匯入任務為strict 模式,樣本如下。
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );從指定的時間點開始消費,樣本如下。
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
匯入JSON格式資料
Routine Load匯入的JSON格式僅支援以下兩種。
只有一條記錄,且為JSON對象。
當使用單表匯入(即通過ON TABLE_NAME指定表名)時,JSON資料格式如下。
{"key1":"value1","key2":"value2","key3":"value3"}當使用動態或多表匯入Routine Load(即不指定具體的表名)時,JSON資料格式如下。
table_name|{"key1":"value1","key2":"value2","key3":"value3"}JSON數組,數組中可含多條記錄。
當使用單表匯入(即通過ON TABLE_NAME指定表名)時,JSON資料格式如下。
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]當使用動態或多表匯入(即不指定具體的表名)時,JSON資料格式如下。
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
匯入JSON格式資料,樣本如下。
建立待匯入的SelectDB資料表,樣本如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;分別使用兩個類型的JSON資料記錄到Topic裡:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]以不同模式匯入JSON資料,樣本如下。
以簡單模式匯入JSON資料。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );精準匯入JSON格式資料。
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );說明表裡的分區欄位
dt在樣本資料裡並沒有,而是在Routine Load語句裡通過dt=from_unixtime(timestamp,'%Y%m%d')轉換出來的。
訪問不同驗證方式的Kafka叢集
根據Kafka叢集驗證方式的不同,訪問的方式樣本如下。
訪問SSL認證的Kafka叢集。
訪問SSL認證的Kafka叢集需要您提供用於認證Kafka Broker公開金鑰的認證檔案(ca.pem)。如果Kafka叢集同時開啟了用戶端認證,則還需提供用戶端的公開金鑰(client.pem)、密鑰檔案(client.key),以及密鑰密碼。這裡所需的檔案需要先通過
CREATE FILE命令上傳到SelectDB中,並且Catalog名稱為kafka。上傳檔案,樣本如下。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");建立Routine Load作業,樣本如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );說明SelectDB通過Kafka的C++ API
librdkafka來訪問Kafka叢集。librdkafka所支援的參數可以參閱Configuration properties。
訪問PLAIN認證的Kafka叢集。
訪問開啟PLAIN認證的Kafka叢集,需要增加配置如下。
property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
property.sasl.mechanism=PLAIN:設定SASL的認證方式為PLAIN。
property.sasl.username=admin:設定SASL的使用者名稱。
property.sasl.password=admin:設定SASL的密碼。
建立Routine Load作業,樣本如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );訪問Kerberos認證的Kafka叢集。
訪問開啟kerberos認證的Kafka叢集,需要增加配置如下。
security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
sasl.kerberos.service.name=$SERVICENAME:設定broker servicename。
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:設定Keytab本地檔案路徑。
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:設定SelectDB串連Kafka時使用的Kerberos主體。
建立Routine Load作業,樣本如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );說明若要使SelectDB訪問開啟kerberos認證方式的Kafka叢集,需要在SelectDB叢集所有運行節點上部署Kerberos用戶端kinit,並配置krb5.conf,填寫KDC服務資訊等。
配置
property.sasl.kerberos.keytab的值需要指定Keytab本地檔案的絕對路徑,並允許SelectDB進程訪問該本地檔案。
修改匯入作業
修改已經建立的例行匯入作業。只能修改處於PAUSED狀態的作業。
文法
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]參數說明
參數名稱 | 參數說明 |
[db.]job_name | 指定要修改的作業名稱。 |
tbl_name | 指定需要匯入的表的名稱。 |
job_properties | 指定需要修改的作業參數。目前僅支援修改的參數如下。
|
data_source | 指定資料來源的類型。當前支援: |
data_source_properties | 指定資料來源的相關屬性。目前僅支援的屬性如下。
說明
|
使用樣本
將
desired_concurrent_number修改為1,樣本如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );將
desired_concurrent_number修改為10,修改partition的offset,修改group id,樣本如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
暫停匯入作業
暫停一個Routine Load作業。被暫停作業可以通過RESUME命令重新運行。
文法
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;參數說明
參數名稱 | 參數說明 |
[db.]job_name | 指定要暫停作業名稱。 |
使用樣本
暫停名稱為test1的Routine Load作業,樣本如下。
PAUSE ROUTINE LOAD FOR test1;暫停所有Routine Load作業,樣本如下。
PAUSE ALL ROUTINE LOAD;
恢複匯入作業
恢複一個被暫停Routine Load作業。恢複的作業將繼續從之前已消費的offset繼續消費。
文法
RESUME [ALL] ROUTINE LOAD FOR <job_name>參數說明
參數名稱 | 參數說明 |
[db.]job_name | 指定要恢複的作業名稱。 |
使用樣本
恢複名稱為test1的Routine Load作業,樣本如下。
RESUME ROUTINE LOAD FOR test1;恢複所有Routine Load作業,樣本如下。
RESUME ALL ROUTINE LOAD;
停止匯入作業
停止一個Routine Load作業。被停止的作業無法再重新運行。停止匯入後,已匯入資料不會復原。
文法
STOP ROUTINE LOAD FOR <job_name>;參數說明
參數名稱 | 參數說明 |
[db.]job_name | 指定要停止的作業名稱。 |
使用樣本
停止名稱為test1的Routine Load作業,樣本如下。
STOP ROUTINE LOAD FOR test1;查看匯入作業
Routine Load作業運行狀態需要通過SHOW ROUTINE LOAD命令查看。
文法
SHOW [ALL] ROUTINE LOAD [FOR job_name];參數說明
參數名稱 | 參數說明 |
[db.]job_name | 指定要查看的作業名稱。 |
如果匯入資料格式不合法,詳細的錯誤資訊會記錄在ErrorLogUrls中。注意其中包含多個連結,拷貝其中任意一個連結在瀏覽器中查看即可。
使用樣本
展示名稱為test1的所有Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。
SHOW ALL ROUTINE LOAD FOR test1;展示名稱為test1的當前正在啟動並執行Routine Load作業。
SHOW ROUTINE LOAD FOR test1;顯示example_db下,所有的Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。
use example_db; SHOW ALL ROUTINE LOAD;顯示example_db下,所有正在啟動並執行Routine Load作業。
use example_db; SHOW ROUTINE LOAD;顯示example_db下,名稱為test1的當前正在啟動並執行Routine Load作業。
SHOW ROUTINE LOAD FOR example_db.test1;顯示example_db下,名稱為test1的所有Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。
SHOW ALL ROUTINE LOAD FOR example_db.test1;
相關係統配置
相關係統配置參數會影響Routine Load的使用。
max_routine_load_task_concurrent_numFE配置項,預設為5,可以運行時修改。該參數限制了一個例行匯入作業最大的子任務並發數。建議維持預設值。設定過大,可能導致同時並發的任務數過多,佔用叢集資源。
max_routine_load_task_num_per_beFE配置項,預設為5,可以運行時修改。該參數限制了每個BE節點最多並發執行的子任務個數。建議維持預設值。如果設定過大,可能導致並發任務數過多,佔用叢集資源。
max_routine_load_job_numFE配置項,預設為100,可以運行時修改。該參數限制Routine Load作業的總數,包括NEED_SCHEDULED,RUNNING,PAUSE這些狀態。超過後,不能再提交新的作業。
max_consumer_num_per_groupBE 配置項,預設為3。該參數表示一個子任務中最多產生幾個Consumer進行資料消費。對於Kafka資料來源,一個Consumer可能消費一個或多個Kafka Partition。假設一個任務需要消費6個Kafka Partition,則會產生3個Consumer,每個Consumer消費2個Partition。如果只有2個Partition,則只會產生2個Consumer,每個Consumer消費1個Partition。
max_tolerable_backend_down_numFE配置項,預設值是0。在滿足某些條件下,SelectDB可令PAUSED的任務重新調度,變成RUNNING。該參數為0代表只有所有BE節點是alive狀態才允許重新調度。
period_of_auto_resume_minFE配置項,預設是5分鐘。該項意味著當SelectDB重新調度任務時,只會在5分鐘這個周期內最多嘗試3次。如果3次都失敗則鎖定當前任務,後續不再進行調度,但可通過人為幹預,進行手動恢複。
其他說明
Routine Load作業和ALTER TABLE操作的關係。
Routine Load不會阻塞Schema變更和ROLLUP操作。但Schema變更完成後,列映射關係無法匹配,則會導致作業的錯誤資料激增,最終導致作業暫停。建議通過在Routine Load作業中顯式指定列映射關係,並且通過增加Nullable列或帶Default值的列來減少這些問題。
刪除表的Partition可能會導致匯入資料無法找到對應的Partition,導致作業進入暫停狀態。
Routine Load作業和其他匯入作業的關係(LOAD,DELETE,INSERT)。
Routine Load和其他LOAD作業以及INSERT操作沒有衝突。
當執行DELETE操作時,對應表分區不能有任何正在執行的匯入任務。因此在執行DELETE操作前,需要暫停Routine Load作業,並等待已下發的任務全部完成後,方可以執行DELETE。
Routine Load作業和DROP DATABASE或DROP TABLE操作的關係。
當Routine Load對應的database或table被刪除後,作業會自動CANCEL。
Kafka類型的Routine Load作業和Kafka topic的關係。
當建立例行匯入聲明的
kafka_topic在Kafka叢集中不存在時:Kafka叢集的Broker設定了
auto.create.topics.enable=true,則topic會先被自動建立,自動建立的Partition個數是由您的Kafka叢集中的Broker配置num.partitions決定的。例行作業會不斷讀取該topic的資料。Kafka叢集的Broker設定了
auto.create.topics.enable=false,則topic不會被自動建立,例行作業會在沒有讀取任何資料之前就被暫停,狀態置為PAUSED。
所以,如果您希望當Kafka topic不存在的時候,被例行作業自動建立的話,只需要將Kafka叢集中的Broker設定
auto.create.topics.enable=true即可。當環境中存在網段和網域名稱解析的隔離措施,因此需要注意以下問題。
建立Routine Load任務中指定的Broker list必須能夠被SelectDB服務訪問。
Kafka中如果配置了
advertised.listeners,advertised.listeners中的地址必須能夠被SelectDB服務訪問。
指定消費的Partition和Offset。
SelectDB支援指定Partition,Offset和時間點進行消費。這裡說明下對應參數的配置關係。
三個相關參數如下。
kafka_partitions:指定待消費的Partition列表,如:"0,1,2,3"。kafka_offsets:指定每個分區的起始offset,必須和kafka_partitions列表個數對應。如:"1000,1000,2000,2000"property.kafka_default_offset:指定分區預設的起始Offset。
在建立匯入作業時,這三個參數可以有以下五種組合方式。
方式
kafka_partitions
kafka_offsets
property.kafka_default_offset
行為
1
No
No
No
系統會自動尋找Topic對應的所有分區並從OFFSET_END開始消費。
2
No
No
Yes
系統會自動尋找Topic對應的所有分區並從default offset指定的位置開始消費。
3
Yes
No
No
系統會從指定分區的OFFSET_END開始消費。
4
Yes
Yes
No
系統會從指定分區的指定Offset處開始消費。
5
Yes
No
Yes
系統會從指定分區,default Offset指定的位置開始消費。
STOP和PAUSE的區別。
FE會自動定期清理STOP狀態的Routine Load,而PAUSE狀態的則可以再次被恢複啟用。