全部產品
Search
文件中心

ApsaraDB for SelectDB:Broker Load

更新時間:Mar 18, 2025

Broker Load 是ApsaraDB for SelectDB用於非同步大量匯入資料的介面,支援從 HDFS、S3、OSS 等分布式儲存系統高效匯入大規模資料(單次支援百GB級)。本文介紹如何通過Broker Load匯入資料至ApsaraDB for SelectDB執行個體。

功能優勢

Broker Load具有以下優勢。

  • 大容量支援:單次匯入資料量可達百GB層級,適合大規模遷移離線資料。

  • 非同步高並發:非阻塞非同步寫入,提高了叢集資源的利用率。

  • 相容性強:支援讀取HDFS、S3等遠端儲存資料。

  • 操作便捷:

    • 可通過標準的MySQL協議建立匯入任務,進行資料匯入。

    • 可通過SHOW LOAD命令即時監控匯入進度與結果。

適用情境

適用於從OSS、HDFS、S3等分布式儲存系統高效匯入大規模資料。

  • 小大量匯入(如100MB)的資料時效性為10秒級。

  • 大大量匯入(如100GB)的時效性為10分鐘級。

建立匯入

該方式用於通過Broker匯入,讀取遠端儲存(如HDFS、S3)上的資料匯入到ApsaraDB for SelectDB的表中。

文法

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

參數說明

參數名稱

參數說明

load_label

匯入任務的唯一標識。Label是在匯入命令中自訂的名稱。通過Label,可以查看對應匯入任務的執行情況。Label也可用於防止重複匯入相同的資料,當Label對應的匯入作業狀態為CANCELLED時,該Label可以再次被使用。

格式:[database.]label_name

說明

推薦同一批次資料使用相同的Label。這樣同一批次資料的重複請求只會被接受一次,保證了At-Most-Once。

data_desc1

描述一組需要匯入的檔案。詳細參數說明,請參見data_desc1參數說明

WITH broker_type

指定需要使用的Broker類型,支援HDFS、S3兩種。S3類型的Broker Load也稱為OSS Load,詳情請參見OSS Load

broker_properties

指定Broker所需的參數讓Broker能夠訪問遠端儲存系統。例如:BOS或HDFS。

文法如下:

( "key1" = "val1", "key2" = "val2", ...)

load_properties

指定匯入的相關參數。詳細參數說明,請參見load_properties參數說明

data_desc1參數說明

[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

參數名稱

參數說明

[MERGE|APPEND|DELETE]

指定資料合併類型。預設為APPEND。預設值表示本次匯入是普通的追加寫操作。MERGE和DELETE類型僅適用於Unique Key模型表。其中MERGE類型需要配合[DELETE ON]語句使用,以標註Delete Flag列。而DELETE類型則表示本次匯入的所有資料皆為刪除資料。

DATA INFILE

指定需要匯入的檔案路徑。需要匯入的檔案路徑可以是多個,可以使用萬用字元。路徑最終必須匹配到檔案,如果只匹配到目錄則匯入失敗。

NEGATIVE

表示本次匯入為一批“負”匯入。這種方式僅針對具有整型SUM彙總類型的彙總資料表。該方式會將匯入資料中SUM彙總列對應的整型數值取反。用於沖抵之前置入錯誤的資料。

PARTITION(p1, p2, ...)

指定僅匯入表的某些分區,不在分區範圍內的資料將被忽略。

COLUMNS TERMINATED BY

指定資料行分隔符號,僅在CSV格式下有效,僅能指定單位元組分隔字元。

FORMAT AS

指定檔案類型,預設為CSV。支援CSV、PARQUET和ORC格式。

column list

指定原始檔案中的列順序。

COLUMNS FROM PATH AS

指定從匯入的檔案中抽取的列。

PRECEDING FILTER predicate

指定前置過濾條件。資料首先根據column listCOLUMNS FROM PATH AS按順序拼接成未經處理資料行。然後按照前置過濾條件進行過濾。

SET (column_mapping)

指定列的轉換函式。

WHERE predicate

指定資料的過濾條件。

DELETE ON expr

需配合MERGE匯入模式一起使用,僅針對Unique Key模型的表。用於指定匯入資料中表示Delete Flag的列和計算關係。

ORDER BY

僅針對使用Unique Key模型的表。用於指定匯入資料中表示Sequence Col的列。主要用於匯入時保證資料順序。

PROPERTIES ("key1"="value1", ...)

指定匯入的format的一些參數。如匯入的檔案是JSON格式,則可以在這裡指定json_root、jsonpaths、fuzzy_parse等參數。

load_properties參數說明

參數名稱

參數說明

timeout

匯入逾時時間,單位為秒,預設為14400,即4小時。

max_filter_ratio

最大容忍可過濾比率(資料不規範等原因)。預設0,即零容忍。取值範圍為0~1。

exec_mem_limit

匯入記憶體限制,單位為位元組,預設為2147483648,即2 GB。

strict_mode

設定匯入任務是否開啟strict 模式,預設為false。

timezone

指定某些受時區影響的函數的時區,預設為Asia/Shanghai時區。支援strftime、alignment_timestamp和from_unixtime等。

load_parallelism

匯入並發度,預設為1。調大匯入並發度會啟動多個執行計畫同時執行匯入任務,加快匯入速度。

send_batch_parallelism

用於設定發送批處理資料的並行度,如果並行度的值超過計算叢集BE配置中的max_send_batch_parallelism_per_job,那麼計算叢集將使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

是否只匯入資料到對應分區的一個tablet,預設值為false。該參數只允許在對帶有random分桶的Duplicate表匯入資料的時候設定。

使用樣本

  1. 建立待匯入的SelectDB資料表,樣本如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
    
    CREATE TABLE test_table2
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 建立待匯入的檔案。

    • 檔案file1.txt,檔案內容如下。

      1,tomori,32,shanghai
      2,anon,22,beijing
      3,taki,23,shenzhen
      4,rana,45,hangzhou
      5,soyo,14,shanghai
      6,saki,25,hangzhou
      7,mutsumi,45,shanghai
      8,uika,26,shanghai
      9,umiri,27,shenzhen
      10,nyamu,37,shanghai
    • 檔案file2.csv,檔案內容如下。

      1,saki,25,hangzhou
      2,mutsumi,45,shanghai
      3,uika,26,shanghai
      4,umiri,27,shenzhen
      5,nyamu,37,shanghai
  3. 將檔案資料匯入到表中。

    • 從HDFS匯入文字檔file1.txt,樣本如下。

      LOAD LABEL example_db.label1
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `my_table`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      匯入檔案file1.txt,按逗號分隔,匯入到表test_table。當從HDFS匯入資料時,broker_properties中需要指定fs.defaultFS屬性,以確保可以正確的串連到HDFS叢集並找到相應的資料檔案。

    • 從HDFS匯入資料,同時匯入兩個檔案到兩個表中,樣本如下。

      LOAD LABEL test_db.test_02
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,temp_age,address)    
          SET (
              age = temp_age + 1
          ),
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      匯入兩個檔案file1.txtfile2.csv,分別匯入到test_tabletest_table2兩張表中,並且將file2.csv中age的值加1後匯入。

    • 從HA模式部署的HDFS叢集中,匯入一批資料,樣本如下。

      LOAD LABEL test_db.test_03
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY "\\x01"
      )
      WITH HDFS
      (
          "hadoop.username" = "hive",
          "fs.defaultFS" = "hdfs://my_ha",
          "dfs.nameservices" = "my_ha",
          "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
          "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
          "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
          "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      );

      指定分隔字元為Hive的預設分隔符號\\x01,並使用萬用字元*指定data目錄下所有目錄的所有檔案。

    • 對匯入資料file1.txt進行過濾處理,合格資料才可匯入,樣本如下。

      LOAD LABEL test_db.test_04
      (
          DATA INFILE("hdfs://host:port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          WHERE age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );
      

      只有未經處理資料中age < 20的行才會被匯入。

    • 從HDFS匯入一批資料file1.txt,指定逾時時間和過濾比例,並且將原有資料中與匯入資料中age<20的列相匹配的行刪除,其他行正常匯入,樣本如下。

      LOAD LABEL test_db.test_05
      (
          MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          DELETE ON age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      )
      PROPERTIES
      (
          "timeout" = "3600",
          "max_filter_ratio" = "0.1"
      );

      使用MERGE方式匯入。test_table必須是一張Unique Key的表。當匯入資料中的age列的值小於20時,該行會被認為是一個刪除行。匯入任務的逾時時間是3600秒,並且允許錯誤率在10%以內。

取消匯入

當Broker Load作業狀態不為CANCELLED或FINISHED時,可以手動取消匯入。取消時需要指定待取消匯入任務的Label。匯入任務取消後,已寫入的資料也會復原,不會生效。

文法

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

參數說明

參數名稱

參數說明

db_name

資料庫名稱。不指定的時使用當前預設資料庫。

load_label

匯入任務的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配匯入任務的Label包含label_pattern的匯入任務。

使用樣本

  • 撤銷資料庫example_db上,Label為example_db_test_load_label的匯入作業。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL = "example_db_test_load_label";
  • 撤銷資料庫example_db上,所有包含example_的匯入作業。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL like "example_";

查看匯入

Broker Load是一個非同步匯入處理程序,語句執行成功僅代表匯入任務提交成功,並不代表資料匯入成功。匯入狀態需要通過SHOW LOAD命令查看。

文法

SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

參數說明

參數名稱

參數說明

db_name

資料庫名稱。不指定的場合使用當前預設資料庫。

your_label

匯入任務的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配匯入任務的Label包含label_matcher的匯入任務。

STATE

匯入狀態。只查看指定狀態的匯入任務。

ORDER BY

指定排序依據。

LIMIT

顯示Limit條匹配記錄。不指定的場合全部顯示。

OFFSET

從位移量offset開始顯示查詢結果。預設情況下位移量為0。

使用樣本

  • 展示資料庫example_db的匯入任務,Label中包含字串2014_01_02,展示儲存時間最久的10個。

    SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
  • 展示資料庫example_db的匯入任務,指定Label為load_example_db_20140102並按LoadStartTime降序排序。

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
  • 展示資料庫example_db的匯入任務,指定Label為load_example_db_20140102,state為loading

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
  • 展示資料庫example_db的匯入任務,按LoadStartTime降序排序,並從位移量5開始顯示10條查詢結果。

    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;

最佳實務

  • 查看匯入任務狀態

    Broker Load是一個非同步匯入處理程序,語句執行成功僅代表匯入任務提交成功,並不代表資料匯入成功。匯入狀態需要通過SHOW LOAD命令查看。

  • 取消匯入任務

    已提交切尚未結束的匯入任務可以通過CANCEL LOAD命令取消。取消後,已寫入的資料也會復原,不會生效。

  • Label、匯入事務、多表原子性

    SelectDB中所有匯入任務都是原子生效的。並且在同一個匯入任務中對多張表的匯入也能夠保證原子性。同時,SelectDB還可以通過Label的機制來保證資料匯入的不丟失和不重複。

  • 列映射、衍生列和過濾

    SelectDB可以在匯入語句中支援非常豐富的列轉換和過濾操作。支援絕大多數內建函數和UDF。詳情請參見資料轉換文檔。

  • 錯誤資料過濾

    SelectDB的匯入任務可以容忍一部分格式錯誤的資料。容忍了通過max_filter_ratio設定。預設為0,即表示當有一條錯誤資料時,整個匯入任務將會失敗。如果您希望忽略部分有問題的資料行,可以將次參數設定為0~1之間的數值,SelectDB會自動跳過哪些資料格式不正確的行。

    關於容忍率的一些計算方式,詳情請參見資料轉換文檔。

  • strict 模式

    strict_mode屬性用於設定匯入任務是否運行在strict 模式下。該格式會對列映射、轉換和過濾的結果產生影響。

  • 逾時時間

    Broker Load的預設逾時時間為4小時,從任務提交開始算起。如果逾時未完成,則任務會失敗。

  • 資料量和任務數限制

    建議通過Broker Load單次匯入100 GB以內的資料。雖然理論上在一個匯入任務中匯入的資料量沒有上限,但是提交過大的匯入會導致已耗用時間較長,並且失敗後重試的代價也會較大。

    同時受限於叢集規模,我們限制了匯入的最巨量資料量為節點數乘3 GB。以保證系統資源的合理利用。如果有巨量資料量需要匯入,建議分成多個匯入任務提交。

    SelectDB會限制叢集內同時啟動並執行匯入任務數量,通常在3~10個之間,之後提交的匯入作業會排隊等待。隊列最大長度為100,之後的提交會直接拒絕。

    說明

    排隊時間也會被計算到作業總時間中。如果逾時,則作業會被取消。所以建議通過監控作業運行狀態來合理控製作業提交頻率。