All Products
Search
Document Center

ApsaraDB for SelectDB:Use Broker Load to import data

Last Updated:Apr 27, 2025

Broker Load is used to asynchronously import data into an ApsaraDB for SelectDB instance at a time. You can use Broker Load to efficiently read hundreds of GB of data from distributed storage systems such as Hadoop Distributed File System (HDFS), Object Storage Service (OSS), and Amazon Simple Storage Service (Amazon S3). This topic describes how to use Broker Load to import data into an ApsaraDB for SelectDB instance.

Benefits

Broker Load provides the following benefits:

  • Large amounts of data: Broker Load can be used to import hundreds of GB of offline data at a time.

  • Asynchronously high concurrency: Broker Load can be used to import data in asynchronous mode without data blocking. This improves the cluster resource usage.

  • High compatibility: Broker Load can be used to read data from remote storage systems such as HDFS and Amazon S3.

  • Ease of use:

    • You can create a Broker Load job over the MySQL protocol to import data.

    • You can execute the SHOW LOAD statement to monitor the data import progress and result in real time.

Scenarios

Broker Load can be used to efficiently read large amounts of data from distributed storage systems such as HDFS, OSS, and Amazon S3.

  • The data import timeliness for small amounts of data, such as 100 MB of data, is at the 10-second level.

  • The data import timeliness for large amounts of data, such as 100 GB of data, is at the 10-minute level.

Create a Broker Load job

A Broker Load job uses a broker to read and import data from a remote storage system such as HDFS or Amazon S3 into a table of an ApsaraDB for SelectDB instance.

Syntax

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

Parameters

Parameter

Description

load_label

The unique identifier of the Broker Load job. You can customize a label for your Broker Load job in the import statement. After the Broker Load job is submitted, you can query the status of the job based on the label. A unique label can also prevent you from repeatedly importing the same data. If the Broker Load job that is associated with a label is in the CANCELLED state, the label can be used by another Broker Load job.

Format: [database.]label_name.

Note

We recommend that you use the same label for the same batch of data. This way, repeated requests for importing the same batch of data are accepted only once. This ensures the at-most-once semantics.

data_desc1

The description of the files to be imported. For more information, see the Parameters of data_desc1 section of this topic.

WITH broker_type

The type of the broker to be used. Valid values: HDFS and S3. A Broker Load job that uses the S3 broker is also referred to as an Object Storage Service (OSS) Load job. For more information, see Import data by using OSS.

broker_properties

The parameters that are required for the broker to access a remote storage system, such as Baidu Object Storage (BOS) or HDFS.

Syntax:

( "key1" = "val1", "key2" = "val2", ...)

load_properties

The parameters of the import. For more information, see the Parameters of load_properties section of this topic.

Parameters of data_desc1

[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

Parameter

Description

[MERGE|APPEND|DELETE]

The mode in which data is merged. Default value: APPEND, which specifies that the import is a standard append operation. You can set this parameter to MERGE or DELETE only for tables that use the Unique Key model. If this parameter is set to MERGE, the DELETE ON statement must be used to specify the column that serves as the Delete Flag column. If this parameter is set to DELETE, all data involved in the import is to be deleted from the table on which you want to perform the import.

DATA INFILE

The paths to the files to be imported. You can enumerate multiple file paths and use wildcards to match files. Make sure that each path points to actual files, not merely to a directory. Otherwise, the import fails.

NEGATIVE

Specifies that the import is a negative import. This parameter is valid only for data of the INTEGER type that is aggregated by using the SUM function. If you specify this parameter, a negation operation is performed on the data of the INTEGER type that is aggregated by using the SUM function. This helps offset the error data that is imported.

PARTITION(p1, p2, ...)

The specific partitions of a table to which the import is limited. Data that is not located within the specified partitions is excluded from the import process.

COLUMNS TERMINATED BY

The column delimiter. This parameter is valid only if the files to be imported are CSV files. You can specify only single-byte delimiters.

FORMAT AS

The format of the files to be imported. Default value: CSV. Valid values: CSV, PARQUET, and ORC.

column list

The sequence of columns in the files to be imported.

COLUMNS FROM PATH AS

The columns that are to be extracted from the files to be imported.

PRECEDING FILTER predicate

The prefilter condition. Data is first concatenated into raw data rows in sequence based on the column list and COLUMNS FROM PATH AS parameters. Then, the data is filtered based on the prefilter condition.

SET (column_mapping)

The function for column conversion.

WHERE predicate

The filter condition for the data.

DELETE ON expr

The statement that is used to specify the column that serves as the Delete Flag column in the data to be imported, as well as to define the calculation relationships. The expression is required if the MERGE import mode is used. This parameter is valid only for tables that use the Unique Key model.

ORDER BY

The statement that is used to specify the column that serves as the sequence column in the data to be imported. This parameter is used to maintain the correct order of data during the import. This parameter is valid only for tables that use the Unique Key model.

PROPERTIES ("key1"="value1", ...)

The format-related parameters for the files to be imported. For example, to import JSON files, you can specify parameters such as json_root, jsonpaths, and fuzzy_parse.

Parameters of load_properties

Parameter

Description

timeout

The timeout period of the import. Unit: seconds. Default value: 14400, which specifies 4 hours.

max_filter_ratio

The maximum allowable ratio of data that can be filtered out during the import due to reasons such as non-conformity with data standards. Default value: 0, which specifies a zero-tolerance policy that allows no data to be filtered out. Valid values: 0 to 1.

exec_mem_limit

The maximum size of memory that can be allocated to the import job. Unit: bytes. Default value: 2147483648, which specifies 2 GB.

strict_mode

Specifies whether to enable the strict mode for the import job. Default value: false.

timezone

The time zone that is used for time zone-sensitive functions in the import job. Default value: Asia/Shanghai. Affected functions include strftime, alignment_timestamp, and from_unixtime.

load_parallelism

The degree of parallelism (DOP) of the import. Default value: 1. If you set this parameter to a value greater than 1, multiple execution plans are initiated to run multiple import jobs at the same time. This accelerates the import process.

send_batch_parallelism

The DOP for sending the data to be processed in batches. If the value of this parameter is greater than that of the max_send_batch_parallelism_per_job parameter in the backend (BE) configurations of the compute cluster, the value of the max_send_batch_parallelism_per_job parameter is used for the compute cluster.

load_to_single_tablet

Specifies whether to import data into only one tablet in the corresponding partition. Default value: false. This parameter is valid only if data is imported into tables that use the Duplicate Key model and contain random buckets.

Examples

  1. Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
    
    CREATE TABLE test_table2
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. Create a file to import data.

    • Create a file named file1.txt that contains the following content:

      1,tomori,32,shanghai
      2,anon,22,beijing
      3,taki,23,shenzhen
      4,rana,45,hangzhou
      5,soyo,14,shanghai
      6,saki,25,hangzhou
      7,mutsumi,45,shanghai
      8,uika,26,shanghai
      9,umiri,27,shenzhen
      10,nyamu,37,shanghai
    • Create a file named file2.csv that contains the following content:

      1,saki,25,hangzhou
      2,mutsumi,45,shanghai
      3,uika,26,shanghai
      4,umiri,27,shenzhen
      5,nyamu,37,shanghai
  3. Import the data of the files into the table.

    • Import the file1.txt file from HDFS. Sample code:

      LOAD LABEL example_db.label1
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `my_table`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      Import the file1.txt file into the test_table table. Separate columns with commas (,). When you import data from HDFS, you must specify the fs.defaultFS property in the broker_properties parameter to ensure that the system can connect to an HDFS cluster as expected and find the corresponding file.

    • Import the two files into two tables from HDFS. Sample code:

      LOAD LABEL test_db.test_02
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,temp_age,address)    
          SET (
              age = temp_age + 1
          ),
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      Import the file1.txt file into the test_table table. Import the file2.csv file into the test_table2 table, and increment the values in the age column by 1 based on the values from the temp_age column in the file2.csv file.

    • Import a batch of data from an HDFS cluster that is deployed in high-availability (HA) mode. Sample code:

      LOAD LABEL test_db.test_03
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY "\\x01"
      )
      WITH HDFS
      (
          "hadoop.username" = "hive",
          "fs.defaultFS" = "hdfs://my_ha",
          "dfs.nameservices" = "my_ha",
          "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
          "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
          "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
          "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      );

      Use the default delimiter \\x01 of Hive, and use a wildcard (*) to specify all files in the data directory.

    • Filter data in the file1.txt file to import only data rows that meet the filter condition. Sample code:

      LOAD LABEL test_db.test_04
      (
          DATA INFILE("hdfs://host:port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          WHERE age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );
      

      Only rows whose value of the age column is smaller than 20 are imported.

    • Import the file1.txt file from HDFS. Specify the timeout period and filtering ratio of the import. Import all rows from the file except the rows whose value of the age column is smaller than 20. Sample code:

      LOAD LABEL test_db.test_05
      (
          MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          DELETE ON age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      )
      PROPERTIES
      (
          "timeout" = "3600",
          "max_filter_ratio" = "0.1"
      );

      Import data in MERGE mode. The test_table table must use the Unique Key model. If the value of the age column is less than 20 in a row to be imported, the row is marked for deletion. The timeout period for an import job is 3,600 seconds, and the maximum filtering ratio for error data rows is 10%.

Cancel a Broker Load job

If a Broker Load job is not in the CANCELLED or FINISHED state, you can manually cancel the job. You must specify the label of the import job to be canceled. After an import job is canceled, the data written in the job is rolled back and does not take effect.

Syntax

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

Parameters

Parameter

Description

db_name

The name of the database. By default, if you do not specify this parameter, the current database is used.

load_label

The label of the import job. Exact match is supported. If you use the LABEL LIKE statement, the import jobs whose label contains label_pattern are matched.

Examples

  • Cancel an import job whose label is example_db_test_load_label from the example_db database.

    CANCEL LOAD
    FROM example_db
    WHERE LABEL = "example_db_test_load_label";
  • Cancel import jobs whose label contains example_ from the example_db database.

    CANCEL LOAD
    FROM example_db
    WHERE LABEL like "example_";

Query the status of a Broker Load job

Broker Load is an asynchronous import method. The successful execution of an import statement indicates only the successful submission of a Broker Load job, rather than the completion of the data import. To query the status of a Broker Load job, execute the SHOW LOAD statement.

Syntax

SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

Parameters

Parameter

Description

db_name

The name of the database. By default, if you do not specify this parameter, the current database is used.

your_label

The label of the import job. Exact match is supported. If you use the LABEL LIKE statement, the import jobs whose label contains label_matcher are matched.

STATE

The status of the import job. You can view only import jobs that are in the specified state.

ORDER BY

The order in which the returned data records are sorted.

LIMIT

The limit on the number of data records that are displayed. If you do not specify this parameter, all data records are displayed.

OFFSET

The number of initial records to skip before the query results start to be displayed. Default value: 0.

Examples

  • Query the import jobs whose label contains 2014_01_02 in the example_db database, and display 10 import jobs that are stored for the longest period of time.

    SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
  • Query the import jobs whose label is load_example_db_20140102 in the example_db database. Sort these jobs by LoadStartTime in descending order.

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
  • Query the import jobs whose label is load_example_db_20140102 in the example_db database. The import jobs are in the loading state.

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
  • Query the import jobs in the example_db database and sort these jobs by LoadStartTime in descending order. Skip the initial five query results and display the next 10 query results.

    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;

Best practices

  • Query the status of an import job

    Broker Load is an asynchronous import method. The successful execution of an import statement indicates only the successful submission of a Broker Load job, rather than the completion of the data import. To query the status of an import job, execute the SHOW LOAD statement.

  • Cancel an import job

    To cancel an import job that is submitted but is not complete, execute the CANCEL LOAD statement. After an import job is canceled, the data written in the job is rolled back and does not take effect.

  • Label, import transaction, and multi-table atomicity

    All import jobs in ApsaraDB for SelectDB are atomic in nature. The atomicity is also ensured when data is imported into multiple tables in an import job. In addition, ApsaraDB for SelectDB uses labels to ensure that the imported data is not lost or duplicated.

  • Mapping, deriving, and filtering of columns

    ApsaraDB for SelectDB supports various operations on column conversion and column filtering in an import statement. Most built-in functions and user-defined functions (UDFs) are supported. For more information, see Converting Source Data.

  • Filtering of error data rows

    ApsaraDB for SelectDB allows you to skip some data rows in incorrect formats in an import job. The filtering ratio is specified by the max_filter_ratio parameter. The default value is 0, which specifies a zero-tolerance policy that allows no data to be filtered out. The import job fails if one error data row is found. If you want to ignore some error data rows during the import, you can set this parameter to a value between 0 and 1. This way, ApsaraDB for SelectDB automatically skips the rows in incorrect data formats.

    For more information about how to calculate the tolerance ratio, see Converting Source Data.

  • Strict mode

    The strict_mode property is used to specify whether an import job runs in strict mode. If an import job runs in strict mode, the mapping, conversion, and filtering of columns are affected.

  • Timeout period

    The default timeout period for a Broker Load job is 4 hours. The timing starts from the time when the import job is submitted. If the import job is not complete within the timeout period, the import job fails.

  • Limits on the amount of data and the number of jobs

    We recommend that you import data of less than 100 GB at a time in a Broker Load job. Theoretically, the amount of data that can be imported in an import job is not limited. However, if the amount of data to be imported is excessively large, the import job may run for an extended period of time, and the cost of retry is high in case that the import job fails.

    In addition, the number of nodes in a cluster is limited. Therefore, a limit is set on the maximum amount of data to be imported, which is the number of nodes multiplied by 3 GB. This ensures that system resources are properly used. If you need to import a large amount of data, we recommend that you submit multiple import jobs.

    ApsaraDB for SelectDB limits the number of concurrent import jobs in a cluster, which is in the range of 3 to 10. If the number of import jobs that you submitted is greater than the limit, the excess import jobs wait in a queue. The maximum length of the queue is 100. If more than 100 import jobs are waiting, the excess import jobs are directly rejected.

    Note

    The waiting time also counts in the total period of time for an import job. If a timeout error occurs, the job is canceled. We recommend that you monitor the status of import jobs to properly control the job submission frequency.