全部產品
Search
文件中心

E-MapReduce:Broker Load

更新時間:Jul 01, 2024

Broker Load是一個非同步匯入方式,支援的資料來源取決於Broker進程支援的資料來源。本文為您介紹Broker Load匯入的基本原理、基本操作、系統配置以及最佳實務。

背景資訊

因為Doris表裡的資料是有序的,所以Broker Load在匯入資料時需要利用Doris叢集資源對資料進行排序,相對於Spark Load來完成海量歷史資料移轉,Broker Load對Doris叢集資源佔用較大。Broker Load方式是在沒有Spark計算資源的情況下使用,如果有Spark計算資源建議使用Spark Load

適用情境

  • 來源資料在Broker可以訪問的儲存系統中,例如HDFS。

  • 資料量在幾十到百GB層級。

基本原理

提交匯入任務後,FE會產生對應的Plan並根據目前BE的個數和檔案的大小,將Plan分給多個BE執行,每個BE執行一部分匯入資料。BE在執行的過程中會從Broker拉取資料,在對資料transform之後將資料匯入系統。所有BE均完成匯入,由FE最終決定匯入是否成功。

+
                 | 1. user create broker load
                 v
            +----+----+
            |         |
            |   FE    |
            |         |
            +----+----+
                 |
                 | 2. BE etl and load the data
    +--------------------------+
    |            |             |
+---v---+     +--v----+    +---v---+
|       |     |       |    |       |
|  BE   |     |  BE   |    |   BE  |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +--+-^--+
    | |           | |         | |
    | |           | |         | | 3. pull data from broker
+---v-+-+     +---v-+-+    +--v-+--+
|       |     |       |    |       |
|Broker |     |Broker |    |Broker |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +---+-^-+
    | |           | |          | |
+---v-+-----------v-+----------v-+-+
|       HDFS/BOS/AFS cluster       |
|                                  |
+----------------------------------+

開始匯入

Hive分區表的資料匯入

  1. 建立Hive表。

    ##資料格式是:預設,分區欄位是:day
    CREATE TABLE `ods_demo_detail`(
      `id` string,
      `store_id` string,
      `company_id` string,
      `tower_id` string,
      `commodity_id` string,
      `commodity_name` string,
      `commodity_price` double,
      `member_price` double,
      `cost_price` double,
      `unit` string,
      `quantity` double,
      `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
  2. 使用Hive的Load命令將您的資料匯入到Hive表中。

    load data local inpath '/opt/custorm' into table ods_demo_detail;
  3. 建立Doris表。

    CREATE TABLE `doris_ods_test_detail` (
      `rq` date NULL,
      `id` varchar(32) NOT NULL,
      `store_id` varchar(32) NULL,
      `company_id` varchar(32) NULL,
      `tower_id` varchar(32) NULL,
      `commodity_id` varchar(32) NULL,
      `commodity_name` varchar(500) NULL,
      `commodity_price` decimal(10, 2) NULL,
      `member_price` decimal(10, 2) NULL,
      `cost_price` decimal(10, 2) NULL,
      `unit` varchar(50) NULL,
      `quantity` int(11) NULL,
      `actual_price` decimal(10, 2) NULL
    ) ENGINE=OLAP
    UNIQUE KEY(`rq`, `id`, `store_id`)
    PARTITION BY RANGE(`rq`)
    (
    PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "2",
    "dynamic_partition.prefix" = "P_",
    "dynamic_partition.buckets" = "1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );
  4. 開始匯入資料。

    LOAD LABEL broker_load_2022_03_23
    (
        DATA INFILE("hdfs://192.168.**.**:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
        INTO TABLE doris_ods_test_detail
        COLUMNS TERMINATED BY ","
      (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        COLUMNS FROM PATH AS (`day`)
       SET
       (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
        )
    WITH BROKER "broker_name_1"
        (
          "username" = "hdfs",
          "password" = ""
        )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

Hive分區表匯入(ORC格式)

  1. 建立ORC格式的Hive分區表。

    #資料格式:ORC 分區:day
    CREATE TABLE `ods_demo_orc_detail`(
      `id` string,
      `store_id` string,
      `company_id` string,
      `tower_id` string,
      `commodity_id` string,
      `commodity_name` string,
      `commodity_price` double,
      `member_price` double,
      `cost_price` double,
      `unit` string,
      `quantity` double,
      `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
    STORED AS ORC
  2. 建立Doris表。

    CREATE TABLE `doris_ods_test_detail` (
      `rq` date NULL,
      `id` varchar(32) NOT NULL,
      `store_id` varchar(32) NULL,
      `company_id` varchar(32) NULL,
      `tower_id` varchar(32) NULL,
      `commodity_id` varchar(32) NULL,
      `commodity_name` varchar(500) NULL,
      `commodity_price` decimal(10, 2) NULL,
      `member_price` decimal(10, 2) NULL,
      `cost_price` decimal(10, 2) NULL,
      `unit` varchar(50) NULL,
      `quantity` int(11) NULL,
      `actual_price` decimal(10, 2) NULL
    ) ENGINE=OLAP
    UNIQUE KEY(`rq`, `id`, `store_id`)
    PARTITION BY RANGE(`rq`)
    (
    PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "2",
    "dynamic_partition.prefix" = "P_",
    "dynamic_partition.buckets" = "1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );
  3. 使用Broker Load匯入資料。

    LOAD LABEL dish_2022_03_23
    (
        DATA INFILE("hdfs://10.220.**.**:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
        INTO TABLE doris_ods_test_detail
        COLUMNS TERMINATED BY ","
        FORMAT AS "orc"
    (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        COLUMNS FROM PATH AS (`day`)
       SET
       (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
        )
    WITH BROKER "broker_name_1"
        (
          "username" = "hdfs",
          "password" = ""
        )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

    其中,涉及參數:

    • FORMAT AS "orc" : 指定了要匯入的資料格式。

    • SET : 定義了Hive表和Doris表之間的欄位對應關係及欄位轉換的一些操作。

HDFS檔案系統資料匯入

以上面建立好的Doris表為例,通過Broker Load從HDFS上匯入資料的語句如下所示。

LOAD LABEL demo.label_20220402
        (
            DATA INFILE("hdfs://10.220.**.**:8020/tmp/test_hdfs.txt")
            INTO TABLE `ods_dish_detail_test`
            COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        )
        with HDFS (
            "fs.defaultFS"="hdfs://10.220.**.**:8020",
            "hadoop.username"="root"
        )
        PROPERTIES
        (
            "timeout"="1200",
            "max_filter_ratio"="0.1"
        );

查看匯入狀態

您可以通過下面的命令查看上面匯入任務的狀態資訊。

show load order by createtime desc limit 1\G;

返回資訊如下所示。

*************************** 1. row ***************************
         JobId: 4132****
         Label: broker_load_2022_03_23
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
    CreateTime: 2022-04-01 18:59:06
  EtlStartTime: 2022-04-01 18:59:11
 EtlFinishTime: 2022-04-01 18:59:11
 LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
           URL: NULL
    JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029****":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029****":[36728051]},"FileNumber":1,"FileSize":5540}
1 row in set (0.01 sec)

取消匯入

當Broker Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消匯入任務的Label 。

例如:撤銷資料庫demo上Label為broker_load_2022_03_23的匯入作業。

CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

相關係統配置

Broker參數

Broker Load需要藉助Broker 程訪問遠端儲存,不同的Broker需要提供不同的參數 。

FE配置

以下配置屬於Broker Load的系統層級配置,即會作用於所有Broker Load匯入任務的配置。主要通過修改fe.conf來調整配置值。

  • min_bytes_per_broker_scanner:限制了單個BE處理的資料量的最小值。

  • max_bytes_per_broker_scanner:限制了單個BE處理的資料量的最大值。

  • max_broker_concurrency:限制了一個作業的最大的匯入並發數。

最小處理的資料量、最大並發數、源檔案大小和當前叢集BE的個數共同決定了本次匯入的並發數。

本次匯入並發數 = Math.min(源檔案大小/最小處理量,最大並發數,當前BE節點個數)
本次匯入單個BE的處理量 = 源檔案大小/本次匯入的並發數

通常一個匯入作業支援的最巨量資料量為max_bytes_per_broker_scanner * BE節點數。如果需要匯入更巨量資料量,則需要適當調整max_bytes_per_broker_scanner參數的大小。

預設參數值如下:

  • min_bytes_per_broker_scanner:預設64 MB,單位bytes。

  • max_bytes_per_broker_scanner:預設3 GB,單位bytes。

  • max_broker_concurrency:預設10。

最佳實務

應用情境

使用Broker Load最適合的情境就是未經處理資料在檔案系統(HDFS、BOS、AFS)中的情境。其次,由於Broker Load是單次匯入中唯一的一種非同步匯入的方式,所以您想使用非同步方式匯入大檔案時,可以考慮使用Broker Load。

資料量

以下內容是針對單個BE的情況,如果您叢集有多個BE,則下面標題中的資料量應該乘以BE個數來計算。例如,您的叢集有3個BE,則3 GB以下(包含)則應該乘以3,也就是9 GB以下(包含)。

  • 3 GB以下(包含):您可以直接提交Broker Load建立匯入請求。

  • 3 GB以上:由於單個匯入BE最大的處理量為3 GB,超過3 GB的待匯入檔案就需要通過調整Broker Load的匯入參數來實現大檔案的匯入。

    1. 根據當前BE的個數和原始檔案的大小修改單個BE的最大掃描量和最大並發數。

      修改fe.conf中配置
      max_broker_concurrency = BE個數
      當前置入任務單個BE處理的資料量 = 原始檔案大小 / max_broker_concurrency
      max_bytes_per_broker_scanner >= 當前置入任務單個BE處理的資料量
      
      例如,一個100 GB的檔案,叢集的BE個數為10個
      max_broker_concurrency = 10
      max_bytes_per_broker_scanner >= 10G = 100G / 10

      修改後,所有的BE會並發的處理匯入任務,每個BE處理原始檔案的一部分。

      說明

      上述兩個FE中的配置均為系統配置,其修改是作用於所有的Broker Load任務。

    2. 建立匯入時自訂當前置入任務的timeout時間。

      當前置入任務單個BE處理的資料量 / 使用者Doris叢集最慢匯入速度(MB/s) >= 當前置入任務的timeout時間 >= 當前置入任務單個BE處理的資料量 / 10M/s
      
      例如,一個100 GB的檔案,叢集的BE個數為10個,則timeout時間如下所示。
      timeout >= 1000s = 10G / 10M/s
    3. 當您發現第二步計算出的timeout時間超過系統預設的匯入最大逾時時間(4小時),此時不推薦您將匯入最大逾時時間直接改大來解決問題。單個匯入時間如果超過預設的匯入最大逾時時間(4小時),最好通過切分待匯入檔案並且分多次匯入來解決問題。主要原因是單次匯入超過4小時的話,匯入失敗後重試的時間成本很高。您可以通過如下公式計算出Doris叢集期望最大匯入檔案資料量。

      期望最大匯入檔案資料量 = 14400s * 10M/s * BE個數
      例如,叢集的BE個數為10個
      期望最大匯入檔案資料量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
      重要

      通常您的環境可能達不到10M/s的速度,所以建議超過500 GB的檔案都進行檔案切分,然後再匯入。

作業調度

系統會限制一個叢集內,正在啟動並執行Broker Load作業數量,以防止同時運行過多的Load作業。

desired_max_waiting_jobs:FE的配置參數,會限制一個叢集內未開始或正在運行(作業狀態為PENDING或LOADING)的Broker Load作業數量。預設為100。如果超過該閾值,新提交的作業將會被直接拒絕。

一個Broker Load作業會被分為pending task和loading task階段。其中pending task負責擷取匯入檔案的資訊,而loading task會發送給BE執行具體的匯入任務。

  • async_pending_load_task_pool_size:FE的配置參數,用於限制同時啟動並執行pending task的任務數量。也相當於控制了實際正在啟動並執行匯入任務數量。該參數預設為10。例如,您提交了100個Load作業,同時只會有10個作業會進入LOADING狀態(開始執行),而其他作業處於PENDING狀態(等待)。

  • async_loading_load_task_pool_size:FE的配置參數,用於限制同時啟動並執行loading task的任務數量。一個Broker Load作業會有1個pending task和多個loading task (等於LOAD語句中DATA INFILE子句的個數),所以async_loading_load_task_pool_size應該大於等於async_pending_load_task_pool_size