In Broker Load mode, StarRocks uses broker processes to read data from data sources, such as Apache Hadoop Distributed File System (HDFS) and Alibaba Cloud Object Storage Service (OSS), and uses its computing resources to preprocess and import the data. This topic describes how to import data in Broker Load mode.
Background information
Broker Load is an asynchronous import method. You can create an import job based on the MySQL protocol and execute the SHOW LOAD statement to query the import results. StarRocks supports data import from external storage systems in various file formats including CSV, ORC, and Parquet. We recommend that you run each import job to import tens to hundreds of GB of data at a time.
Import data in Broker Load mode
Query brokers
SHOW PROC "/brokers"\G
*************************** 1. row ***************************
Name: broker
IP: 10.0.1.151
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 2. row ***************************
Name: broker
IP: 10.0.1.154
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 3. row ***************************
Name: broker
IP: 10.0.1.153
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 4. row ***************************
Name: broker
IP: 10.0.1.152
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
4 rows in set (0.00 sec)
Create an import job
- Syntax
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )]
- Parameters
You can execute the
HELP BROKER LOAD
statement to view the syntax for creating an import job.- Label
The identifier of the import job. Each import job has a unique label. You can define a custom label. Otherwise, a label is generated by the system. The label can be used to query the status of the import job and avoid importing duplicate data. After the state of the import job changes to FINISHED, the label becomes invalid. If the state of the import job changes to CANCELLED, you can use the label to submit the import job again.
- data_desc
A string that describes the data to be imported. You can specify multiple data_desc strings, each with information such as the data source address, extract, transform, and load (ETL) functions, destination table, and partitions.
An import job in Broker Load mode allows you to import data to multiple tables at a time. Specify a data_desc string for each destination table. In each data_desc string, specify the data source address, which can contain multiple file_path strings that specify multiple source files. The Broker Lode mode ensures the success atomicity of importing data to multiple tables at a time. The following code shows the parameters that are usually specified in a data_desc string:data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
The following table describes the parameters.
Parameter Description file_path Either a specific file path that points to a file, or a file path that contains asterisks (*) as wildcards and points to all files in a directory. The parent directories of the destination directory can also contain wildcards. The following special characters are supported as wildcards: ? * [] {} ^. For more information about how to use wildcards, see FileSystem.
For example, if you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename//, all files in the partitions of the /tablename directory are imported. If you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/, only files in the 202104 partition of the /tablename directory are imported.
negative A flag used to indicate that the specified source data has been imported and will be deleted by this import job. This parameter is applicable if you want to cancel a batch of data that has been imported to aggregate columns of the SUM type in the destination table. After the import job is complete, the batch of data will be deleted from the aggregate columns.
partition The partitions in the source files to be imported to the destination table. Only source data that belongs to the specified partitions is imported. Source data that does not belong to the specified partitions is determined as erroneous data. If you do not want such data to be determined as erroneous data, use a WHERE predicate to filter it out.
column_separator The column delimiter that you want to use to separate data in the source files into columns. Default value: \t. If you want to specify an invisible character as the column delimiter, add \x as the prefix and set the delimiter in hexadecimal. For example, if the delimiter used in a source Hive file is \x01, set the column delimiter to \\x01.
file_type The format of the source files. Valid values: parquet, orc, and csv. Default value: csv. Parquet files have the file name extension .parquet or .parq.
COLUMNS FROM PATH AS The partition fields in the path of the source files. For example, the path of a source file is /path/col_name=col_value/dt=20210101/file1, and col_name and dt are table columns. The values col_value and 20210101 are imported to the destination columns that correspond to col_name and dt, as shown in the following code:(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping The SET statement that contains functions for column type conversion. If source columns and destination columns are of different types, you must specify a SET statement for column type conversion.
where predicate The WHERE predicate that you want to use for data filtering after column type conversion. Data that is filtered out is not counted for the calculation of the maximum filter ratio. If data_desc strings altogether contain multiple WHERE predicates for the same table, the predicates are combined by the AND operator.
- broker_properties
In the broker_properties string, configure property parameters for the import job. These property parameters apply to the import job.
broker_properties: (key2=value2, ...)
The following table describes some of the property parameters in the broker_properties string.
Parameter Description timeout The timeout period for the import job. Unit: seconds. You can specify a timeout period for each import job in the opt_properties string. If the import job is not complete within the specified timeout period, the state of the import job changes to CANCELLED. The default timeout period for import jobs in Broker Load mode is 4 hours.
Important You do not necessarily need to specify a timeout period for an import job, unless you estimate that the completion of the import job will take longer than the default timeout period.We recommend that you calculate the minimum timeout period in units of seconds by using the following formula:
(Total file size in MB × Number of source tables and related rollup tables)/(30 × Concurrency of the import job)
.The number 30 in the formula indicates 30 MB/s, which is the average import speed on backends. For example, the total file size of source data is 1 GB, the one source table has two rollup tables, and the concurrency of the import job is 3. The minimum timeout period is calculated by using the following formula: (1 × 1,024 × 3)/(10 × 3) = 102 seconds.
StarRocks clusters have varied machine environments and concurrent query jobs. You must estimate the slowest import speed of your StarRocks cluster based on the speed of historical import jobs.
max_filter_ratio The maximum filter ratio of the import job. Valid values: 0 to 1. Default value: 0. If the error rate of the import job exceeds the maximum filter ratio, the import job fails. If you want erroneous data rows to be ignored, set this parameter to a value greater than 0 to ensure that the import job can succeed. Calculate an appropriate maximum filter ratio by using the following formula:
max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)
.dpp.abnorm.ALL
indicates the number of data rows that cannot be imported due to various reasons such as type mismatch, column quantity mismatch, and length mismatch.dpp.norm.ALL
indicates the number of data rows that can be imported. You can execute theSHOW LOAD
statement to query the amount of data imported by the import job.Number of data rows in the source files = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit The limit on the memory allocated to the import job. Default value: 0. A value of 0 specifies that the memory allocated to the import job is not limited. strict_mode Specifies whether to enable the strict mode for the import job. To enable the strict mode, set this parameter to true: properties ("strict_mode" = "true")
.By default, the strict mode is disabled.
If the strict mode is enabled, erroneous data is filtered out after column type conversion. Erroneous data refers to the values that are not null in the source files but are converted to null values after column type conversion. Take note of the following items:- The strict mode does not apply to source columns whose values are generated based on functions.
- If a destination column restricts values to a range and a value in the source column can be converted but the result value is out of the range, the strict mode does not apply to the source column. For example, a value in the source column is 10 and the destination column is of the DECIMAL(1,0) type. The value 10 can be converted but the result value is out of the range. The strict mode does not apply to the source column.
- Label
- Sample code for creating an import job to import data from Alibaba Cloud OSS
Important In StarRocks clusters, you can use broker as the broker name.
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" );
Query the status of an import job
SHOW LOAD
statement and then execute the statement. To view the syntax of this statement, execute
the HELP SHOW LOAD
statement.
SHOW LOAD
statement supports only asynchronous import jobs. For synchronous import jobs such
as those in Stream Load mode, you cannot use the SHOW LOAD
statement to query the status.
show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
Parameter | Description |
---|---|
JobId | The unique ID of the import job. The job ID is automatically generated by the system. The ID of an import job can never be reused, whereas the label of an import job can be reused if the import job fails. |
Label | The identifier of the import job. |
State | The state of the import job. Valid values:
|
Progress | The progress information of the import job. The progress information describes the
two import phases: ETL and LOADING. Data import in Broker Load mode involves only
the LOADING phase. The ETL progress is displayed as N/A, and the LOADING progress
can be 0 to 100%.
The LOADING progress is calculated by using the following formula: When all the source tables have been imported and the import job is ready to be complete,
the LOADING progress reaches 99%. The LOADING progress reaches 100% only after the
import job is complete.
Important The import progress is not linear. If the progress remains unchanged within a period
of time, the import job may still be in progress.
|
Type | The type of the import job. The type of the import job in Broker Load mode is BROKER. |
EtlInfo | The data amount metrics of the import job: unselected.rows, dpp.norm.ALL, and dpp.abnorm.ALL.
unselected.rows indicates the number of data rows filtered out by the WHERE predicate. dpp.norm.ALL and dpp.abnorm.ALL help determine whether the error rate of the import job exceeds the maximum filter ratio. The sum of the three metrics equals the total number of data rows in the source files. |
TaskInfo | The parameters that you specified when you created the import job, including the cluster, timeout period, and maximum filter ratio. |
ErrorMsg | The message returned by the import job. If the import job is in the CANCELLED state,
the value of this parameter indicates the cause of the import failure and contains
two parts: type and msg. If the import job is in the FINISHED state, the value of
this parameter is N/A. The type part has the following valid values:
|
CreateTime | The time when the import job was created, the time when the ETL phase started, the
time when the ETL phase ended, the time when the LOADING phase started, and the time
when the import job was complete.
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | The URL of sample erroneous data during the import job. If the import job involves no erroneous data, the value of this parameter is N/A. |
JobDetails | The details of the import job, including the number of imported files and their total
size in bytes, the number of tasks, the number of processed source data rows, the
ID of the backend that runs the tasks, and the ID of the backend on which the waiting
tasks reside.
The number of processed source data rows is updated every 5 seconds. This number indicates only the current progress and does not mean the total number of data rows that are processed after the import job is complete. The latter is shown by the EtlInfo parameter. |
Cancel an import job
Import jobs in Broker Load mode can be canceled when they are not in the CANCELLED
or FINISHED state. To cancel an import job, specify its label in the CANCEL LOAD statement
and then execute the statement. You can execute the HELP CANCEL LOAD
statement to view the syntax for canceling an import job.
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];
Import data from HDFS
- Syntax
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker1' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
- HDFS authentication
The community edition of HDFS supports two authentication modes: simple authentication and Kerberos authentication.
- Simple authentication: User identities are determined by the operating system of the
client that is connected to HDFS.
The following table describes the parameters.
Parameter Description hadoop.security.authentication The authentication mode. In this example, this parameter is set to simple. username The username that is used to log on to HDFS. password The password that is used to log on to HDFS. - Kerberos authentication: The user identity of a client is determined by its Kerberos
credentials.
The following table describes the parameters.
Parameter Description hadoop.security.authentication The authentication mode. In this example, this parameter is set to kerberos. kerberos_principal The principal for Kerberos authentication. kerberos_keytab The path of the Kerberos keytab file. The file must reside on the same server as the broker processes. kerberos_keytab_content The Base64-encoded content of the Kerberos keytab file. Important You must configure one of this parameter and the kerberos_keytab parameter.
- Simple authentication: User identities are determined by the operating system of the
client that is connected to HDFS.
- HA configurations of HDFS
After you configure high availability (HA) for NameNodes in an HDFS cluster, if the active NameNode is switched to the other one, the new active NameNode can be automatically identified. To access an HDFS cluster deployed in HA mode, configure the parameters that are described in the following table.
Parameter Description dfs.nameservices The name of the HDFS service. You can set a custom name. For example, set the dfs.nameservices parameter to my_ha.
dfs.ha.namenodes.xxx The custom name of the NameNode. Separate multiple names with commas (,). Replace xxx in this parameter name with the custom name that you set for the dfs.nameservices parameter. For example, set the dfs.ha.namenodes.my_ha parameter to my_nn.
dfs.namenode.rpc-address.xxx.nn The address used by the NameNode for remote procedure calls (RPCs). Replace nn in this parameter name with the name of the NameNode that you set for the dfs.ha.namenodes.xxx parameter. For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to a value in the Hostname:Port number format.
dfs.client.failover.proxy.provider The provider that the client uses to connect to the NameNode. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider. The following sample code provides an example:( "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
You can use simple authentication or Kerberos authentication to access HDFS clusters deployed in HA mode. The following sample code provides an example on how to access an HA HDFS cluster by using simple authentication:( "username"="user", "password"="passwd", "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )
The configurations of an HDFS cluster can be written into the hdfs-site.xml file. If you use a broker process to read information about the HDFS cluster, you need to only specify the file path and authentication information of the cluster.
Example
Concurrency of import jobs
- If an import job has multiple data_desc strings that specify source tables from different data source addresses, the import job is split into tasks, each with a data_desc string.
- If an import job has multiple data_desc strings that specify different partitions of a source table, the import job is also split into tasks, each with a data_desc string.
- min_bytes_per_broker_scanner: the minimum amount of data processed by each instance. By default, the minimum data amount is 64 MB.
- max_broker_concurrency: the maximum number of parallel instances for each task. Default value: 100.
- load_parallel_instance_num: the number of parallel instances on each backend. Default value: 1.
The total number of instances equals the minimum of the following values: the total
size of imported files divided by the value of min_bytes_per_broker_scanner
, the value of max_broker_concurrency, and the value of load_parallel_instance_num
multiplied by the number of backends.
In most cases, an import job has only one data_desc string and therefore contains only one task. The task is split into instances. The number of instances equals that of the backends. Each instance is assigned to a different backend for parallel running.
FAQ
- What do I do if the
ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel
error is reported?For more information about how to troubleshoot the error, see FAQ.
- What do I do if the
failed to send batch
or "TabletWriter add batch with unknown id" error is reported?Modify the query_timeout and streaming_load_rpc_max_alive_time_sec parameters. For more information, see the backend configurations described in General system configurations.
- What do I do if the
LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found
error is reported?If data is imported from a Parquet file or an ORC file, make sure that the column names in the file header are consistent with those in the destination StarRocks table. The following sample code provides an example:(tmp_c1,tmp_c2) SET ( id=tmp_c2, name=tmp_c1 )
As shown in the code, the columns tmp_c1 and tmp_c2 in the Parquet or ORC file are mapped respectively to the columns id and name in the StarRocks table. If you do not specify aSET
statement, the source columns are mapped to the columns that have the same names but may not exist in the StarRocks table.Important If you import an ORC file that is generated in Hive of a specific version, the table header in the ORC file may not be Hive metadata but column names, such as (_col0, _col1, _col2, ...). In this case, the "Invalid Column Name" error may be reported. You must specify aSET
statement for correct mapping. - How do I configure ViewFs in HDFS Federation?
Copy the ViewFs configuration files core-site.xml and hdfs-site.xml to the broker/conf directory. If a custom file system is available, copy the JAR file to the broker/lib directory.
- What do I do if the
Can't get Kerberos realm
error is reported when I access a cluster with Kerberos authentication?First, make sure that the /etc/krb5.conf file is configured on all physical machines of the brokers and then try again. If the error is still reported, add
-Djava.security.krb5.conf:/etc/krb5.conf
to the JAVA_OPTS variable in the broker startup script. - What do I do if an import job stays in the PENDING state for an extended period of
time?
Search for the keyword "error" in the log/be.INFO file on the backend to identify the cause.