All Products
Search
Document Center

E-MapReduce:Broker Load

Last Updated:Dec 07, 2023

In Broker Load mode, StarRocks uses broker processes to read data from data sources, such as Apache Hadoop Distributed File System (HDFS) and Alibaba Cloud Object Storage Service (OSS), and uses its computing resources to preprocess and import the data. This topic describes how to import data in Broker Load mode.

Background information

Broker Load is an asynchronous import method. You can create an import job based on the MySQL protocol and execute the SHOW LOAD statement to query the import results. StarRocks supports data import from external storage systems in various file formats including CSV, ORC, and Parquet. We recommend that you run each import job to import tens to hundreds of GB of data at a time.

Import data in Broker Load mode

Query brokers

When you create a StarRocks cluster in E-MapReduce (EMR), brokers are automatically built and started on every core node. You can execute the SHOW PROC statement to query the brokers of a cluster. The following code shows the syntax:

SHOW PROC "/brokers"\G

The following output is returned:

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 2. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 3. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 4. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
4 rows in set (0.00 sec)

Create an import job

  • Syntax

    StarRocks version earlier than 2.5.8

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_name broker_properties
        [PROPERTIES (key1=value1, ... )]

    StarRocks version being 2.5.8 or later

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_properties
        [PROPERTIES (key1=value1, ... )]
  • Parameters

    You can execute the HELP BROKER LOAD statement to view the syntax for creating an import job.

    • Label

      The identifier of the import job. Each import job has a unique label. You can define a custom label. Otherwise, a label is generated by the system. The label can be used to query the status of the import job and avoid importing duplicate data. After the state of the import job changes to FINISHED, the label becomes invalid. If the state of the import job changes to CANCELLED, you can use the label to submit the import job again.

    • data_desc

      A string that describes the data to be imported. You can specify multiple data_desc strings, each with information such as the data source address, extract, transform, and load (ETL) functions, destination table, and partitions.

      An import job in Broker Load mode allows you to import data to multiple tables at a time. You can specify a data_desc string for each destination table. In each data_desc string, you can specify the data source address, which can contain multiple file_path strings. The file_path strings are used to specify multiple source files. The Broker Lode mode ensures the success atomicity of importing data to multiple tables at a time. The following code shows the parameters that are usually configured in a data_desc string:

      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      The following table describes the parameters.

      Parameter

      Description

      file_path

      Either a specific file path that points to a file, or a file path that contains asterisks (*) as wildcards and points to all files in a directory. The parent directories of the destination directory can also contain wildcards.

      The following special characters are supported as wildcards: ? * [] {} ^. For more information about how to use wildcards, see FileSystem.

      For example, if you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename//, all files in the partitions of the /tablename directory are imported. If you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/, only files in the 202104 partition of the /tablename directory are imported.

      negative

      A flag used to indicate that the specified source data has been imported and will be deleted by this import job.

      This parameter is applicable if you want to cancel a batch of data that has been imported to aggregate columns of the SUM type in the destination table. After the import job is complete, the batch of data will be deleted from the aggregate columns.

      partition

      The partitions in the source files to be imported to the destination table.

      Only source data that belongs to the specified partitions is imported. Source data that does not belong to the specified partitions is determined as erroneous data. If you do not want such data to be determined as erroneous data, use a WHERE predicate to filter it out.

      column_separator

      The column delimiter that you want to use to separate data in the source files into columns. Default value: \t.

      If you want to specify an invisible character as the column delimiter, add \x as the prefix and set the delimiter in hexadecimal. For example, if the delimiter used in a source Hive file is \x01, set the column delimiter to \\x01.

      file_type

      The format of the source files. Valid values: parquet, orc, and csv. Default value: csv.

      Parquet files have the file name extension .parquet or .parq.

      COLUMNS FROM PATH AS

      The partition fields in the path of the source files.

      For example, the path of a source file is /path/col_name=col_value/dt=20210101/file1, and col_name and dt are table columns. The values col_value and 20210101 are imported to the destination columns that correspond to col_name and dt, as shown in the following code:

      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)

      set column mapping

      The SET statement that contains functions for column type conversion.

      If source columns and destination columns are of different types, you must specify a SET statement for column type conversion.

      where predicate

      The WHERE predicate that you want to use for data filtering after column type conversion.

      Data that is filtered out is not counted for the calculation of the maximum filter ratio. If data_desc strings altogether contain multiple WHERE predicates for the same table, the predicates are combined by the AND operator.

    • broker_properties

      Property parameters for the import job in the broker_properties string. These property parameters apply to the import job.

      broker_properties:
          (key2=value2, ...)

      The following table describes some of the property parameters in the broker_properties string.

      Parameter

      Description

      timeout

      The timeout period for the import job. Unit: seconds.

      You can specify a timeout period for each import job in the opt_properties string. If the import job is not complete within the specified timeout period, the state of the import job changes to CANCELLED. The default timeout period for import jobs in Broker Load mode is 4 hours.

      Important

      You do not necessarily need to specify a timeout period for an import job, unless you estimate that the completion of the import job will take longer than the default timeout period.

      We recommend that you calculate the minimum timeout period in units of seconds by using the following formula: (Total file size in MB × Number of source tables and related rollup tables)/(30 × Concurrency of the import job).

      The number 30 in the formula indicates 30 MB/s, which is the average import speed on backends. For example, the total file size of source data is 1 GB, the one source table has two rollup tables, and the concurrency of the import job is 3. The minimum timeout period is calculated by using the following formula: (1 × 1,024 × 3)/(10 × 3) = 102 seconds.

      StarRocks clusters have varied machine environments and concurrent query jobs. You must estimate the slowest import speed of your StarRocks cluster based on the speed of historical import jobs.

      max_filter_ratio

      The maximum filter ratio of the import job. Valid values: 0 to 1. Default value: 0. If the error rate of the import job exceeds the maximum filter ratio, the import job fails. If you want erroneous data rows to be ignored, set this parameter to a value greater than 0 to ensure that the import job can succeed.

      Calculate an appropriate maximum filter ratio by using the following formula: max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL).

      dpp.abnorm.ALL indicates the number of data rows that cannot be imported due to various reasons such as type mismatch, column quantity mismatch, and length mismatch. dpp.abnorm.ALL indicates the number of data rows that can be imported. You can execute the SHOW LOAD statement to query the amount of data imported by the import job.

      Number of data rows in the source files = dpp.abnorm.ALL + dpp.norm.ALL

      load_mem_limit

      The limit on the memory allocated to the import job. Default value: 0. The value 0 indicates that the memory allocated to the import job is not limited.

      strict_mode

      Specifies whether to enable the strict mode for the import job. To enable the strict mode, set this parameter to true: properties ("strict_mode" = "true").

      By default, the strict mode is disabled.

      If the strict mode is enabled, erroneous data is filtered out after column type conversion. Erroneous data refers to the values that are not null in the source files but are converted to null values after column type conversion. Take note of the following items:

      • The strict mode does not apply to source columns whose values are generated based on functions.

      • If a destination column restricts values to a range and a value in the source column can be converted but the result value is out of the range, the strict mode does not apply to the source column. For example, a value in the source column is 10 and the destination column is of the DECIMAL(1,0) type. The value 10 can be converted but the result value is out of the range. The strict mode does not apply to the source column.

  • Sample code for creating an import job to import data from Alibaba Cloud OSS

    Important
    • In StarRocks clusters, you can use broker as the broker name.

    • If the StarRocks version is earlier than 2.5.8, you can refer to the following sample code to create an import job. If the StarRocks version is 2.5.8 or later, remove the WITH BROKER broker part from the following sample code.

    StarRocks version earlier than 2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
    );

    StarRocks version being 2.5.8 or later

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )

Query the status of an import job

Data import in Broker Load mode is asynchronous. To query the status of an import job, specify the label of the job in the SHOW LOAD statement and then execute the statement. To view the syntax of this statement, execute the HELP SHOW LOAD statement.

Important

The SHOW LOAD statement supports only asynchronous import jobs. For synchronous import jobs such as those in Stream Load mode, you cannot use the SHOW LOAD statement to query the status.

The following sample code provides an example on how to query the status of an import job:

show load where label = 'label1'\G
*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

The following table describes the parameters.

Parameter

Description

JobId

The unique ID of the import job. The job ID is automatically generated by the system. The ID of an import job can never be reused, whereas the label of an import job can be reused if the import job fails.

Label

The identifier of the import job.

State

The state of the import job. Valid values:

  • PENDING: The import job is waiting to be run.

  • LOADING: The import job is in progress.

  • CANCELLED: The import job fails.

  • FINISHED: The import job is successful.

Progress

The progress information of the import job. The progress information describes the two import phases: ETL and LOADING. Data import in Broker Load mode involves only the LOADING phase. The ETL progress is displayed as N/A, and the LOADING progress can be 0 to 100%.

The LOADING progress is calculated by using the following formula: LOADING progress = Number of tables that have been imported/Total number of source tables in this import job × 100%.

When all the source tables have been imported and the import job is ready to be complete, the LOADING progress reaches 99%. The LOADING progress reaches 100% only after the import job is complete.

Important

The import progress is not linear. If the progress remains unchanged within a period of time, the import job may still be in progress.

Type

The type of the import job. The type of the import job in Broker Load mode is BROKER.

EtlInfo

The data amount metrics of the import job: unselected.rows, dpp.norm.ALL, and dpp.abnorm.ALL.

unselected.rows indicates the number of data rows filtered out by the WHERE predicate. dpp.norm.ALL and dpp.abnorm.ALL help determine whether the error rate of the import job exceeds the maximum filter ratio. The sum of the values of the three metrics equals the total number of data rows in the source files.

TaskInfo

The parameters that you configured when you created the import job, including the cluster, timeout period, and maximum filter ratio.

ErrorMsg

The message returned by the import job. If the import job is in the CANCELLED state, the value of this parameter indicates the cause of the import failure and contains two parts: type and msg. If the import job is in the FINISHED state, the value of this parameter is N/A. The type part has the following valid values:

  • USER-CANCEL: The import job is canceled.

  • ETL-RUN-FAIL: The import job fails in the ETL phase.

  • ETL-QUALITY-UNSATISFIED: The error rate exceeds the maximum filter ratio.

  • LOAD-RUN-FAIL: The import job fails in the LOADING phase.

  • TIMEOUT: The import job is not complete within the timeout period.

  • UNKNOWN: An unknown error occurs during the running of the import job.

CreateTime

The time when the import job is created, the time when the ETL phase starts, the time when the ETL phase ends, the time when the LOADING phase starts, and the time when the import job is complete.

  • Import jobs in Broker Load mode do not have the ETL phase. Therefore, the values of the EtlStartTime, EtlFinishTime, and LoadStartTime parameters are the same.

  • If, within an extended period of time, only the value of the CreateTime parameter is displayed and the value of the LoadStartTime parameter remains N/A, a large number of import jobs are waiting to be run. We recommend that you submit no more import jobs for now.

    LoadFinishTime - CreateTime = Duration of the import job
    
    LoadFinishTime - LoadStartTime = Duration of the LOADING phase for the import job = Duration of the import job - Wait time of the import job

EtlStartTime

EtlFinishTime

LoadStartTime

LoadFinishTime

URL

The URL of sample erroneous data during the import job. If the import job involves no erroneous data, the value of this parameter is N/A.

JobDetails

The details of the import job, including the number of imported files and their total size in bytes, the number of tasks, the number of processed source data rows, the ID of the backend that runs the tasks, and the ID of the backend on which the waiting tasks reside.

{"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

The number of processed source data rows is updated every 5 seconds. This number indicates only the current progress and does not mean the total number of data rows that are processed after the import job is complete. The latter is shown by the EtlInfo parameter.

Cancel an import job

Import jobs in Broker Load mode can be canceled when they are not in the CANCELLED or FINISHED state. To cancel an import job, specify its label in the CANCEL LOAD statement and then execute the statement. You can execute the HELP CANCEL LOAD statement to view the syntax for canceling an import job.

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

Import data from HDFS

  • Syntax

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker1'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • HDFS authentication

    The community edition of HDFS supports two authentication modes: simple authentication and Kerberos authentication.

    • Simple authentication: User identities are determined by the operating system of the client that is connected to HDFS.

      The following table describes the parameters.

      Parameter

      Description

      hadoop.security.authentication

      The authentication mode. In this example, this parameter is set to simple.

      username

      The username that is used to log on to HDFS.

      password

      The password that is used to log on to HDFS.

    • Kerberos authentication: The user identity of a client is determined by its Kerberos credentials.

      The following table describes the parameters.

      Parameter

      Description

      hadoop.security.authentication

      The authentication mode. In this example, this parameter is set to kerberos.

      kerberos_principal

      The principal for Kerberos authentication.

      kerberos_keytab

      The path of the Kerberos keytab file. The file must reside on the same server as the broker processes.

      kerberos_keytab_content

      The Base64-encoded content of the Kerberos keytab file.

      Important

      You must configure either this parameter or the kerberos_keytab parameter.

  • HA configurations of HDFS

    After you configure high availability (HA) for NameNodes in an HDFS cluster, if the active NameNode is switched to the other one, the new active NameNode can be automatically identified. To access an HDFS cluster deployed in HA mode, configure the parameters that are described in the following table.

    Parameter

    Description

    dfs.nameservices

    The name of the HDFS service. You can set a custom name.

    For example, set the dfs.nameservices parameter to my_ha.

    dfs.ha.namenodes.xxx

    The custom name of the NameNode. Separate multiple names with commas (,). Replace xxx in this parameter name with the custom name that you set for the dfs.nameservices parameter.

    For example, set the dfs.ha.namenodes.my_ha parameter to my_nn.

    dfs.namenode.rpc-address.xxx.nn

    The address used by the NameNode for remote procedure calls (RPCs). Replace nn in this parameter name with the name of the NameNode that you set for the dfs.ha.namenodes.xxx parameter.

    For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to a value in the Hostname:Port number format.

    dfs.client.failover.proxy.provider

    The provider that the client uses to connect to the NameNode. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.

    The following sample code provides an example:

    (
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    You can use simple authentication or Kerberos authentication to access HDFS clusters deployed in HA mode. The following sample code provides an example on how to access an HA HDFS cluster by using simple authentication:

    (
        "username"="user",
        "password"="passwd",
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    The configurations of an HDFS cluster can be written into the hdfs-site.xml file. If you use a broker process to read information about the HDFS cluster, you need to only specify the file path and authentication information of the cluster.

Example

  1. Create a test table. The following sample code is run to create a table named lineitem in the tpch database:

    CREATE TABLE lineitem (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber int,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate date,
      l_commitdate date,
      l_receiptdate date,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    ENGINE=OLAP
    DUPLICATE KEY(l_orderkey)
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES(
      "replication_num" = "1"
    );
  2. Create an import job.

    Important

    If the StarRocks version is earlier than 2.5.8, you can refer to the following sample code to create an import job. If the StarRocks version is 2.5.8 or later, remove the WITH BROKER broker part from the following sample code.

    StarRocks version earlier than 2.5.8

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "xxx"
    );

    StarRocks version being 2.5.8 or later

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
  3. Query the status of the import job.

    show load where label = 'lineitem'\G;
    
    *************************** 1. row ***************************
             JobId: 1****
             Label: lineitem
             State: FINISHED
          Progress: ETL:100%; LOAD:100%
              Type: BROKER
           EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215
          TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
          ErrorMsg: NULL
        CreateTime: 2022-04-13 15:07:53
      EtlStartTime: 2022-04-13 15:07:56
     EtlFinishTime: 2022-04-13 15:07:56
     LoadStartTime: 2022-04-13 15:07:56
    LoadFinishTime: 2022-04-13 15:08:06
               URL: NULL
        JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072}
    2 rows in set (0.00 sec)
  4. After the import job is complete, query data based on your business requirements.

    • Query the number of data rows in the lineitem table.

      select count(*) from lineitem;

      The following output is returned:

      +----------+
      | count(*) |
      +----------+
      |  6001215 |
      +----------+
      1 row in set (0.03 sec)
    • Query data in the first two rows of the lineitem table.

      select * from lineitem limit 2;

      The following output is returned:

      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment                                  |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      |         69 |    115209 |      7721 |            1 |         48 |         58761.6 |       0.01 |  0.07 | A            | F            | 1994-08-17 | 1994-08-11   | 1994-09-08    | NONE           | TRUCK      | regular epitaphs. carefully even ideas hag |
      |         69 |    104180 |      9201 |            2 |         32 |        37893.76 |       0.08 |  0.06 | A            | F            | 1994-08-24 | 1994-08-17   | 1994-08-31    | NONE           | REG AIR    | s sleep carefully bold,                    |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      2 rows in set (0.01 sec)

Concurrency of import jobs

An import job consists of one or more tasks. Tasks are run in parallel. An import job can be split into tasks based on data_desc strings in the LOAD statement. Examples:

  • If an import job has multiple data_desc strings that specify source tables from different data source addresses, the import job is split into tasks, each with a data_desc string.

  • If an import job has multiple data_desc strings that specify different partitions of a source table, the import job is also split into tasks, each with a data_desc string.

Each task can contain one or more instances, which are evenly distributed to backends for parallel running. A task is split into instances based on the following frontend configurations:

  • min_bytes_per_broker_scanner: the minimum amount of data processed by each instance. By default, the minimum data amount is 64 MB.

  • max_broker_concurrency: the maximum number of parallel instances for each task. Default value: 100.

  • load_parallel_instance_num: the number of parallel instances on each backend. Default value: 1.

The total number of instances equals the minimum of the following values: the total size of imported files divided by the value of min_bytes_per_broker_scanner, the value of max_broker_concurrency, and the value of load_parallel_instance_num multiplied by the number of backends.

In most cases, an import job has only one data_desc string and therefore contains only one task. The task is split into instances. The number of instances equals that of the backends. Each instance is assigned to a different backend for parallel running.