Broker Load is used to asynchronously import data into an ApsaraDB for SelectDB instance at a time. You can use Broker Load to efficiently read hundreds of GB of data from distributed storage systems such as Hadoop Distributed File System (HDFS), Object Storage Service (OSS), and Amazon Simple Storage Service (Amazon S3). This topic describes how to use Broker Load to import data into an ApsaraDB for SelectDB instance.
Benefits
Broker Load provides the following benefits:
Large amounts of data: Broker Load can be used to import hundreds of GB of offline data at a time.
Asynchronously high concurrency: Broker Load can be used to import data in asynchronous mode without data blocking. This improves the cluster resource usage.
High compatibility: Broker Load can be used to read data from remote storage systems such as HDFS and Amazon S3.
Ease of use:
You can create a Broker Load job over the MySQL protocol to import data.
You can execute the
SHOW LOADstatement to monitor the data import progress and result in real time.
Scenarios
Broker Load can be used to efficiently read large amounts of data from distributed storage systems such as HDFS, OSS, and Amazon S3.
The data import timeliness for small amounts of data, such as 100 MB of data, is at the 10-second level.
The data import timeliness for large amounts of data, such as 100 GB of data, is at the 10-minute level.
Create a Broker Load job
A Broker Load job uses a broker to read and import data from a remote storage system such as HDFS or Amazon S3 into a table of an ApsaraDB for SelectDB instance.
Syntax
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];Parameters
Parameter | Description |
| The unique identifier of the Broker Load job. You can customize a label for your Broker Load job in the import statement. After the Broker Load job is submitted, you can query the status of the job based on the label. A unique label can also prevent you from repeatedly importing the same data. If the Broker Load job that is associated with a label is in the CANCELLED state, the label can be used by another Broker Load job. Format: Note We recommend that you use the same label for the same batch of data. This way, repeated requests for importing the same batch of data are accepted only once. This ensures the at-most-once semantics. |
| The description of the files to be imported. For more information, see the Parameters of data_desc1 section of this topic. |
| The type of the broker to be used. Valid values: HDFS and S3. A Broker Load job that uses the S3 broker is also referred to as an Object Storage Service (OSS) Load job. For more information, see Import data by using OSS. |
| The parameters that are required for the broker to access a remote storage system, such as Baidu Object Storage (BOS) or HDFS. Syntax: |
| The parameters of the import. For more information, see the Parameters of load_properties section of this topic. |
Parameters of data_desc1
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]Parameter | Description |
| The mode in which data is merged. Default value: APPEND, which specifies that the import is a standard append operation. You can set this parameter to MERGE or DELETE only for tables that use the Unique Key model. If this parameter is set to MERGE, the DELETE ON statement must be used to specify the column that serves as the Delete Flag column. If this parameter is set to DELETE, all data involved in the import is to be deleted from the table on which you want to perform the import. |
| The paths to the files to be imported. You can enumerate multiple file paths and use wildcards to match files. Make sure that each path points to actual files, not merely to a directory. Otherwise, the import fails. |
| Specifies that the import is a negative import. This parameter is valid only for data of the INTEGER type that is aggregated by using the SUM function. If you specify this parameter, a negation operation is performed on the data of the INTEGER type that is aggregated by using the SUM function. This helps offset the error data that is imported. |
| The specific partitions of a table to which the import is limited. Data that is not located within the specified partitions is excluded from the import process. |
| The column delimiter. This parameter is valid only if the files to be imported are CSV files. You can specify only single-byte delimiters. |
| The format of the files to be imported. Default value: CSV. Valid values: CSV, PARQUET, and ORC. |
| The sequence of columns in the files to be imported. |
| The columns that are to be extracted from the files to be imported. |
| The prefilter condition. Data is first concatenated into raw data rows in sequence based on the |
| The function for column conversion. |
| The filter condition for the data. |
| The statement that is used to specify the column that serves as the Delete Flag column in the data to be imported, as well as to define the calculation relationships. The expression is required if the MERGE import mode is used. This parameter is valid only for tables that use the Unique Key model. |
| The statement that is used to specify the column that serves as the sequence column in the data to be imported. This parameter is used to maintain the correct order of data during the import. This parameter is valid only for tables that use the Unique Key model. |
| The format-related parameters for the files to be imported. For example, to import JSON files, you can specify parameters such as json_root, jsonpaths, and fuzzy_parse. |
Parameters of load_properties
Parameter | Description |
| The timeout period of the import. Unit: seconds. Default value: 14400, which specifies 4 hours. |
| The maximum allowable ratio of data that can be filtered out during the import due to reasons such as non-conformity with data standards. Default value: 0, which specifies a zero-tolerance policy that allows no data to be filtered out. Valid values: 0 to 1. |
| The maximum size of memory that can be allocated to the import job. Unit: bytes. Default value: 2147483648, which specifies 2 GB. |
| Specifies whether to enable the strict mode for the import job. Default value: false. |
| The time zone that is used for time zone-sensitive functions in the import job. Default value: |
| The degree of parallelism (DOP) of the import. Default value: 1. If you set this parameter to a value greater than 1, multiple execution plans are initiated to run multiple import jobs at the same time. This accelerates the import process. |
| The DOP for sending the data to be processed in batches. If the value of this parameter is greater than that of the max_send_batch_parallelism_per_job parameter in the backend (BE) configurations of the compute cluster, the value of the max_send_batch_parallelism_per_job parameter is used for the compute cluster. |
| Specifies whether to import data into only one tablet in the corresponding partition. Default value: false. This parameter is valid only if data is imported into tables that use the Duplicate Key model and contain random buckets. |
Examples
Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");Create a file to import data.
Create a file named
file1.txtthat contains the following content:1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghaiCreate a file named
file2.csvthat contains the following content:1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
Import the data of the files into the table.
Import the
file1.txtfile from HDFS. Sample code:LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Import the file1.txt file into the test_table table. Separate columns with commas (,). When you import data from HDFS, you must specify the
fs.defaultFSproperty in the broker_properties parameter to ensure that the system can connect to an HDFS cluster as expected and find the corresponding file.Import the two files into two tables from HDFS. Sample code:
LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Import the
file1.txtfile into thetest_tabletable. Import thefile2.csvfile into thetest_table2table, and increment the values in the age column by 1 based on the values from the temp_age column in thefile2.csvfile.Import a batch of data from an HDFS cluster that is deployed in high-availability (HA) mode. Sample code:
LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );Use the default delimiter
\\x01of Hive, and use a wildcard(*)to specify all files in thedatadirectory.Filter data in the file1.txt file to import only data rows that meet the filter condition. Sample code:
LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Only rows whose value of the
agecolumn is smaller than 20 are imported.Import the file1.txt file from HDFS. Specify the timeout period and filtering ratio of the import. Import all rows from the file except the rows whose value of the
agecolumn is smaller than 20. Sample code:LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );Import data in MERGE mode. The
test_tabletable must use the Unique Key model. If the value of theagecolumn is less than 20 in a row to be imported, the row is marked for deletion. The timeout period for an import job is 3,600 seconds, and the maximum filtering ratio for error data rows is 10%.
Cancel a Broker Load job
If a Broker Load job is not in the CANCELLED or FINISHED state, you can manually cancel the job. You must specify the label of the import job to be canceled. After an import job is canceled, the data written in the job is rolled back and does not take effect.
Syntax
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];Parameters
Parameter | Description |
| The name of the database. By default, if you do not specify this parameter, the current database is used. |
| The label of the import job. Exact match is supported. If you use the LABEL LIKE statement, the import jobs whose label contains label_pattern are matched. |
Examples
Cancel an import job whose label is
example_db_test_load_labelfrom theexample_dbdatabase.CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";Cancel import jobs whose label contains
example_from theexample_dbdatabase.CANCEL LOAD FROM example_db WHERE LABEL like "example_";
Query the status of a Broker Load job
Broker Load is an asynchronous import method. The successful execution of an import statement indicates only the successful submission of a Broker Load job, rather than the completion of the data import. To query the status of a Broker Load job, execute the SHOW LOAD statement.
Syntax
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];Parameters
Parameter | Description |
| The name of the database. By default, if you do not specify this parameter, the current database is used. |
| The label of the import job. Exact match is supported. If you use the LABEL LIKE statement, the import jobs whose label contains label_matcher are matched. |
| The status of the import job. You can view only import jobs that are in the specified state. |
| The order in which the returned data records are sorted. |
| The limit on the number of data records that are displayed. If you do not specify this parameter, all data records are displayed. |
| The number of initial records to skip before the query results start to be displayed. Default value: 0. |
Examples
Query the import jobs whose label contains
2014_01_02in theexample_dbdatabase, and display 10 import jobs that are stored for the longest period of time.SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;Query the import jobs whose label is
load_example_db_20140102in theexample_dbdatabase. Sort these jobs byLoadStartTimein descending order.SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;Query the import jobs whose label is
load_example_db_20140102in theexample_dbdatabase. The import jobs are in theloadingstate.SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";Query the import jobs in the
example_dbdatabase and sort these jobs byLoadStartTimein descending order. Skip the initial five query results and display the next 10 query results.SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
Best practices
Query the status of an import job
Broker Load is an asynchronous import method. The successful execution of an import statement indicates only the successful submission of a Broker Load job, rather than the completion of the data import. To query the status of an import job, execute the
SHOW LOADstatement.Cancel an import job
To cancel an import job that is submitted but is not complete, execute the
CANCEL LOADstatement. After an import job is canceled, the data written in the job is rolled back and does not take effect.Label, import transaction, and multi-table atomicity
All import jobs in ApsaraDB for SelectDB are atomic in nature. The atomicity is also ensured when data is imported into multiple tables in an import job. In addition, ApsaraDB for SelectDB uses labels to ensure that the imported data is not lost or duplicated.
Mapping, deriving, and filtering of columns
ApsaraDB for SelectDB supports various operations on column conversion and column filtering in an import statement. Most built-in functions and user-defined functions (UDFs) are supported. For more information, see Converting Source Data.
Filtering of error data rows
ApsaraDB for SelectDB allows you to skip some data rows in incorrect formats in an import job. The filtering ratio is specified by the
max_filter_ratioparameter. The default value is 0, which specifies a zero-tolerance policy that allows no data to be filtered out. The import job fails if one error data row is found. If you want to ignore some error data rows during the import, you can set this parameter to a value between 0 and 1. This way, ApsaraDB for SelectDB automatically skips the rows in incorrect data formats.For more information about how to calculate the tolerance ratio, see Converting Source Data.
Strict mode
The
strict_modeproperty is used to specify whether an import job runs in strict mode. If an import job runs in strict mode, the mapping, conversion, and filtering of columns are affected.Timeout period
The default timeout period for a Broker Load job is 4 hours. The timing starts from the time when the import job is submitted. If the import job is not complete within the timeout period, the import job fails.
Limits on the amount of data and the number of jobs
We recommend that you import data of less than 100 GB at a time in a Broker Load job. Theoretically, the amount of data that can be imported in an import job is not limited. However, if the amount of data to be imported is excessively large, the import job may run for an extended period of time, and the cost of retry is high in case that the import job fails.
In addition, the number of nodes in a cluster is limited. Therefore, a limit is set on the maximum amount of data to be imported, which is the number of nodes multiplied by 3 GB. This ensures that system resources are properly used. If you need to import a large amount of data, we recommend that you submit multiple import jobs.
ApsaraDB for SelectDB limits the number of concurrent import jobs in a cluster, which is in the range of 3 to 10. If the number of import jobs that you submitted is greater than the limit, the excess import jobs wait in a queue. The maximum length of the queue is 100. If more than 100 import jobs are waiting, the excess import jobs are directly rejected.
NoteThe waiting time also counts in the total period of time for an import job. If a timeout error occurs, the job is canceled. We recommend that you monitor the status of import jobs to properly control the job submission frequency.