Broker Load 是ApsaraDB for SelectDB用於非同步大量匯入資料的介面,支援從 HDFS、S3、OSS 等分布式儲存系統高效匯入大規模資料(單次支援百GB級)。本文介紹如何通過Broker Load匯入資料至ApsaraDB for SelectDB執行個體。
功能優勢
Broker Load具有以下優勢。
大容量支援:單次匯入資料量可達百GB層級,適合大規模遷移離線資料。
非同步高並發:非阻塞非同步寫入,提高了叢集資源的利用率。
相容性強:支援讀取HDFS、S3等遠端儲存資料。
操作便捷:
可通過標準的MySQL協議建立匯入任務,進行資料匯入。
可通過
SHOW LOAD命令即時監控匯入進度與結果。
適用情境
適用於從OSS、HDFS、S3等分布式儲存系統高效匯入大規模資料。
小大量匯入(如100MB)的資料時效性為10秒級。
大大量匯入(如100GB)的時效性為10分鐘級。
建立匯入
該方式用於通過Broker匯入,讀取遠端儲存(如HDFS、S3)上的資料匯入到ApsaraDB for SelectDB的表中。
文法
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];參數說明
參數名稱 | 參數說明 |
| 匯入任務的唯一標識。Label是在匯入命令中自訂的名稱。通過Label,可以查看對應匯入任務的執行情況。Label也可用於防止重複匯入相同的資料,當Label對應的匯入作業狀態為CANCELLED時,該Label可以再次被使用。 格式: 說明 推薦同一批次資料使用相同的Label。這樣同一批次資料的重複請求只會被接受一次,保證了At-Most-Once。 |
| 描述一組需要匯入的檔案。詳細參數說明,請參見data_desc1參數說明。 |
| 指定需要使用的Broker類型,支援HDFS、S3兩種。S3類型的Broker Load也稱為OSS Load,詳情請參見OSS Load。 |
| 指定Broker所需的參數讓Broker能夠訪問遠端儲存系統。例如:BOS或HDFS。 文法如下: |
| 指定匯入的相關參數。詳細參數說明,請參見load_properties參數說明。 |
data_desc1參數說明
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]參數名稱 | 參數說明 |
| 指定資料合併類型。預設為APPEND。預設值表示本次匯入是普通的追加寫操作。MERGE和DELETE類型僅適用於Unique Key模型表。其中MERGE類型需要配合[DELETE ON]語句使用,以標註Delete Flag列。而DELETE類型則表示本次匯入的所有資料皆為刪除資料。 |
| 指定需要匯入的檔案路徑。需要匯入的檔案路徑可以是多個,可以使用萬用字元。路徑最終必須匹配到檔案,如果只匹配到目錄則匯入失敗。 |
| 表示本次匯入為一批“負”匯入。這種方式僅針對具有整型SUM彙總類型的彙總資料表。該方式會將匯入資料中SUM彙總列對應的整型數值取反。用於沖抵之前置入錯誤的資料。 |
| 指定僅匯入表的某些分區,不在分區範圍內的資料將被忽略。 |
| 指定資料行分隔符號,僅在CSV格式下有效,僅能指定單位元組分隔字元。 |
| 指定檔案類型,預設為CSV。支援CSV、PARQUET和ORC格式。 |
| 指定原始檔案中的列順序。 |
| 指定從匯入的檔案中抽取的列。 |
| 指定前置過濾條件。資料首先根據 |
| 指定列的轉換函式。 |
| 指定資料的過濾條件。 |
| 需配合MERGE匯入模式一起使用,僅針對Unique Key模型的表。用於指定匯入資料中表示Delete Flag的列和計算關係。 |
| 僅針對使用Unique Key模型的表。用於指定匯入資料中表示Sequence Col的列。主要用於匯入時保證資料順序。 |
| 指定匯入的format的一些參數。如匯入的檔案是JSON格式,則可以在這裡指定json_root、jsonpaths、fuzzy_parse等參數。 |
load_properties參數說明
參數名稱 | 參數說明 |
| 匯入逾時時間,單位為秒,預設為14400,即4小時。 |
| 最大容忍可過濾比率(資料不規範等原因)。預設0,即零容忍。取值範圍為0~1。 |
| 匯入記憶體限制,單位為位元組,預設為2147483648,即2 GB。 |
| 設定匯入任務是否開啟strict 模式,預設為false。 |
| 指定某些受時區影響的函數的時區,預設為 |
| 匯入並發度,預設為1。調大匯入並發度會啟動多個執行計畫同時執行匯入任務,加快匯入速度。 |
| 用於設定發送批處理資料的並行度,如果並行度的值超過計算叢集BE配置中的max_send_batch_parallelism_per_job,那麼計算叢集將使用max_send_batch_parallelism_per_job的值。 |
| 是否只匯入資料到對應分區的一個tablet,預設值為false。該參數只允許在對帶有random分桶的Duplicate表匯入資料的時候設定。 |
使用樣本
建立待匯入的SelectDB資料表,樣本如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");建立待匯入的檔案。
檔案
file1.txt,檔案內容如下。1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghai檔案
file2.csv,檔案內容如下。1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
將檔案資料匯入到表中。
從HDFS匯入文字檔
file1.txt,樣本如下。LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );匯入檔案file1.txt,按逗號分隔,匯入到表test_table。當從HDFS匯入資料時,broker_properties中需要指定
fs.defaultFS屬性,以確保可以正確的串連到HDFS叢集並找到相應的資料檔案。從HDFS匯入資料,同時匯入兩個檔案到兩個表中,樣本如下。
LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );匯入兩個檔案
file1.txt和file2.csv,分別匯入到test_table和test_table2兩張表中,並且將file2.csv中age的值加1後匯入。從HA模式部署的HDFS叢集中,匯入一批資料,樣本如下。
LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );指定分隔字元為Hive的預設分隔符號
\\x01,並使用萬用字元*指定data目錄下所有目錄的所有檔案。對匯入資料file1.txt進行過濾處理,合格資料才可匯入,樣本如下。
LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );只有未經處理資料中
age < 20的行才會被匯入。從HDFS匯入一批資料file1.txt,指定逾時時間和過濾比例,並且將原有資料中與匯入資料中
age<20的列相匹配的行刪除,其他行正常匯入,樣本如下。LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );使用MERGE方式匯入。
test_table必須是一張Unique Key的表。當匯入資料中的age列的值小於20時,該行會被認為是一個刪除行。匯入任務的逾時時間是3600秒,並且允許錯誤率在10%以內。
取消匯入
當Broker Load作業狀態不為CANCELLED或FINISHED時,可以手動取消匯入。取消時需要指定待取消匯入任務的Label。匯入任務取消後,已寫入的資料也會復原,不會生效。
文法
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];參數說明
參數名稱 | 參數說明 |
| 資料庫名稱。不指定的時使用當前預設資料庫。 |
| 匯入任務的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配匯入任務的Label包含label_pattern的匯入任務。 |
使用樣本
撤銷資料庫
example_db上,Label為example_db_test_load_label的匯入作業。CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";撤銷資料庫
example_db上,所有包含example_的匯入作業。CANCEL LOAD FROM example_db WHERE LABEL like "example_";
查看匯入
Broker Load是一個非同步匯入處理程序,語句執行成功僅代表匯入任務提交成功,並不代表資料匯入成功。匯入狀態需要通過SHOW LOAD命令查看。
文法
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];參數說明
參數名稱 | 參數說明 |
| 資料庫名稱。不指定的場合使用當前預設資料庫。 |
| 匯入任務的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配匯入任務的Label包含label_matcher的匯入任務。 |
| 匯入狀態。只查看指定狀態的匯入任務。 |
| 指定排序依據。 |
| 顯示Limit條匹配記錄。不指定的場合全部顯示。 |
| 從位移量offset開始顯示查詢結果。預設情況下位移量為0。 |
使用樣本
展示資料庫
example_db的匯入任務,Label中包含字串2014_01_02,展示儲存時間最久的10個。SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;展示資料庫
example_db的匯入任務,指定Label為load_example_db_20140102並按LoadStartTime降序排序。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;展示資料庫
example_db的匯入任務,指定Label為load_example_db_20140102,state為loading。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";展示資料庫
example_db的匯入任務,按LoadStartTime降序排序,並從位移量5開始顯示10條查詢結果。SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
最佳實務
查看匯入任務狀態
Broker Load是一個非同步匯入處理程序,語句執行成功僅代表匯入任務提交成功,並不代表資料匯入成功。匯入狀態需要通過
SHOW LOAD命令查看。取消匯入任務
已提交切尚未結束的匯入任務可以通過
CANCEL LOAD命令取消。取消後,已寫入的資料也會復原,不會生效。Label、匯入事務、多表原子性
SelectDB中所有匯入任務都是原子生效的。並且在同一個匯入任務中對多張表的匯入也能夠保證原子性。同時,SelectDB還可以通過Label的機制來保證資料匯入的不丟失和不重複。
列映射、衍生列和過濾
SelectDB可以在匯入語句中支援非常豐富的列轉換和過濾操作。支援絕大多數內建函數和UDF。詳情請參見資料轉換文檔。
錯誤資料過濾
SelectDB的匯入任務可以容忍一部分格式錯誤的資料。容忍了通過
max_filter_ratio設定。預設為0,即表示當有一條錯誤資料時,整個匯入任務將會失敗。如果您希望忽略部分有問題的資料行,可以將次參數設定為0~1之間的數值,SelectDB會自動跳過哪些資料格式不正確的行。關於容忍率的一些計算方式,詳情請參見資料轉換文檔。
strict 模式
strict_mode屬性用於設定匯入任務是否運行在strict 模式下。該格式會對列映射、轉換和過濾的結果產生影響。逾時時間
Broker Load的預設逾時時間為4小時,從任務提交開始算起。如果逾時未完成,則任務會失敗。
資料量和任務數限制
建議通過Broker Load單次匯入100 GB以內的資料。雖然理論上在一個匯入任務中匯入的資料量沒有上限,但是提交過大的匯入會導致已耗用時間較長,並且失敗後重試的代價也會較大。
同時受限於叢集規模,我們限制了匯入的最巨量資料量為節點數乘3 GB。以保證系統資源的合理利用。如果有巨量資料量需要匯入,建議分成多個匯入任務提交。
SelectDB會限制叢集內同時啟動並執行匯入任務數量,通常在3~10個之間,之後提交的匯入作業會排隊等待。隊列最大長度為100,之後的提交會直接拒絕。
說明排隊時間也會被計算到作業總時間中。如果逾時,則作業會被取消。所以建議通過監控作業運行狀態來合理控製作業提交頻率。