Broker Load is an asynchronous import method in StarRocks that reads data from external storage systems using broker processes, then uses StarRocks' own compute resources to preprocess and load the data. Use Broker Load to import tens to hundreds of GB per job from Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). A single Broker Load job can target multiple destination tables atomically — either all tables succeed or all fail.
How Broker Load works
Broker Load has two operating modes depending on your StarRocks version:
-
Broker-based loading (StarRocks earlier than 2.5.8): StarRocks depends on broker processes to connect to external storage. Specify the broker name explicitly with
WITH BROKER "<broker_name>"in the load statement. -
Broker-free loading (StarRocks 2.5.8 and later): StarRocks connects to external storage directly. Omit the broker name but keep the
WITH BROKERkeyword in the load statement.
When you create a StarRocks cluster in E-MapReduce (EMR), broker processes are automatically deployed and started on every core node.
Supported sources and formats
| Dimension | Supported values |
|---|---|
| Data sources | HDFS, OSS |
| File formats | CSV (default), ORC, Parquet (.parquet or .parq) |
Prerequisites
Before you begin, ensure that you have:
-
A running StarRocks cluster in EMR with at least one core node
-
Access credentials for HDFS or OSS
-
A destination database and table in StarRocks
Query brokers
To confirm brokers are running on your cluster, run:
SHOW PROC "/brokers"\G
The output lists each broker with its IP address, port (default: 8000), alive status, and timestamps:
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
Create an import job
All import jobs use the LOAD LABEL statement. The syntax differs slightly between StarRocks versions.
For StarRocks versions earlier than 2.5.8
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
For StarRocks versions 2.5.8 and later
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_properties
[PROPERTIES (key1=value1, ... )]
In EMR StarRocks clusters, use broker as the broker name.
To view the full syntax reference, run HELP BROKER LOAD.
Label
Each import job requires a unique label. Define a custom label or let the system generate one. The label serves two purposes:
-
Track job status with
SHOW LOAD WHERE label = '<label>' -
Prevent duplicate imports — StarRocks rejects a new job if a FINISHED job already has the same label
After a job reaches FINISHED state, the label expires. After a job is CANCELLED, the same label can be reused to resubmit the job.
data_desc parameters
The data_desc clause describes the source data for one destination table. A single job can contain multiple data_desc clauses targeting different tables — StarRocks guarantees atomicity across all tables in one job.
data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY column_separator]
[FORMAT AS file_type]
[(col1, ...)]
[COLUMNS FROM PATH AS (colx, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
| Parameter | Description |
|---|---|
file_path |
A specific file path or a glob pattern using wildcards (?, *, [], {}, ^). For example, hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ imports all files in all partitions under /tablename. See FileSystem.globStatus for wildcard syntax. |
NEGATIVE |
Marks source rows as deletions. Use this to undo a batch import to aggregate columns of the SUM type. |
PARTITION |
Restricts import to the specified partitions in the destination table. Rows that don't belong to the listed partitions are counted as errors. Use a WHERE predicate to filter them out instead. |
column_separator |
Column delimiter in source files. Default: \t. For invisible characters, use hexadecimal format (e.g., \\x01 for the Hive default \x01). |
file_type |
Source file format: csv (default), orc, or parquet. |
COLUMNS FROM PATH AS |
Extracts partition field values from the file path. For example, a path /path/col_name=col_value/dt=20210101/file1 lets you import col_value and 20210101 into the col_name and dt table columns. |
SET |
Column type conversion functions. Required when source and destination column types differ. |
WHERE predicate |
Filters rows after column type conversion. Filtered rows are excluded from the error rate calculation. Multiple WHERE predicates for the same table are combined with AND. |
broker_properties and job-level properties
The broker_properties clause passes storage credentials and connection settings. The optional PROPERTIES block controls job execution behavior.
broker_properties:
(key2=value2, ...)
These are different scopes: broker_properties (inside WITH BROKER) configures how StarRocks connects to the storage system. PROPERTIES (the outer block) controls how the job runs — timeout, filter ratio, memory, and mode.
| Parameter | Description |
|---|---|
timeout |
Job timeout in seconds. Default: 14,400 (4 hours). If the job doesn't complete within this period, it moves to CANCELLED. Estimate the minimum timeout with: (Total file size in MB × Number of source and rollup tables) / (30 × concurrency). For example, a 1 GB file with one source table, two rollup tables, and concurrency of 3 requires at least (1 × 1,024 × 3) / (10 × 3) = 102 seconds. Set a timeout only if you expect the job to exceed the default. |
max_filter_ratio |
Maximum acceptable error rate, from 0 to 1. Default: 0. If the error rate exceeds this value, the job fails. Set a value greater than 0 to allow some erroneous rows to be skipped. Calculate the ratio as: max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL). |
load_mem_limit |
Memory limit for the job in bytes. Default: 0 (no limit). |
strict_mode |
When set to true, rows where non-null source values convert to null are treated as errors and filtered out. Default: disabled. Enable with PROPERTIES ("strict_mode" = "true"). Strict mode does not apply to function-generated columns or values that are valid but out of the destination column's range. |
Load data from OSS
The following example loads a TPC-H lineitem file from OSS into a StarRocks table.
For StarRocks versions earlier than 2.5.8
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
For StarRocks versions 2.5.8 and later
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
"fs.oss.accessKeyId" = "xxx",
"fs.oss.accessKeySecret" = "xxx",
"fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);
Load data from HDFS
Authentication
HDFS supports two authentication modes.
Simple authentication — user identity is determined by the OS of the connecting client:
| Parameter | Description |
|---|---|
hadoop.security.authentication |
Set to simple. |
username |
HDFS username. |
password |
HDFS password. |
Kerberos authentication — user identity is determined by Kerberos credentials:
| Parameter | Description |
|---|---|
hadoop.security.authentication |
Set to kerberos. |
kerberos_principal |
The Kerberos principal. |
kerberos_keytab |
Path to the Kerberos keytab file. The file must reside on the same server as the broker process. |
kerberos_keytab_content |
Base64-encoded content of the keytab file. Configure either this parameter or kerberos_keytab, not both. |
High availability configuration
For HDFS clusters with NameNode high availability (HA), configure the following parameters so StarRocks can automatically detect active NameNode failover:
| Parameter | Description |
|---|---|
dfs.nameservices |
Custom name for the HDFS service (e.g., my-ha). |
dfs.ha.namenodes.xxx |
Comma-separated NameNode names. Replace xxx with the value of dfs.nameservices. |
dfs.namenode.rpc-address.xxx.nn |
Remote procedure call (RPC) address of each NameNode (Hostname:Port). Replace nn with each NameNode name. |
dfs.client.failover.proxy.provider.xxx |
Failover proxy provider. Default: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider. |
Example broker_properties for an HA HDFS cluster with simple authentication:
(
"username" = "user",
"password" = "passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
If the cluster's HDFS configuration is stored in hdfs-site.xml, specify only the file path and authentication parameters.
HDFS load example
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1, tmp_c2)
SET
(
id = tmp_c2,
name = tmp_c1
),
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
WHERE col1 > 1
)
WITH BROKER 'broker1'
(
"username" = "hdfs_username",
"password" = "hdfs_password"
)
PROPERTIES
(
"timeout" = "3600"
);
Query job status
Broker Load is asynchronous. After submitting a job, use SHOW LOAD to track its progress:
SHOW LOAD WHERE label = 'label1'\G
To view the full SHOW LOAD syntax, run HELP SHOW LOAD.
SHOW LOAD works only with asynchronous import methods. It does not apply to synchronous methods like Stream Load.
The output looks like:
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
| Field | Description |
|---|---|
JobId |
System-generated unique ID for the job. Never reused. |
Label |
The label you specified or that was auto-generated. |
State |
Current job state: PENDING (waiting), LOADING (running), CANCELLED (failed), or FINISHED (succeeded). |
Progress |
Broker Load has no ETL phase, so ETL always shows N/A. LOADING progress is (tables imported / total source tables) × 100%. It reaches 99% when all tables are imported but the job is finalizing — 100% only after the job is fully complete. Progress is not linear; a stable percentage does not mean the job has stopped. |
Type |
Always BROKER for Broker Load jobs. |
EtlInfo |
Three counters: unselected.rows (rows filtered by WHERE), dpp.norm.ALL (rows successfully loaded), dpp.abnorm.ALL (rows with errors). Their sum equals the total number of rows in the source files. |
TaskInfo |
The job parameters you configured: cluster, timeout, and max_filter_ratio. |
ErrorMsg |
Error details when the job is CANCELLED. N/A when FINISHED. The type field can be: USER-CANCEL, ETL-RUN-FAIL, ETL-QUALITY-UNSATISFIED, LOAD-RUN-FAIL, TIMEOUT, or UNKNOWN. |
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime |
Timestamps for each phase. Broker Load has no ETL phase, so EtlStartTime, EtlFinishTime, and LoadStartTime are the same value. If only CreateTime is populated and LoadStartTime remains N/A for an extended period, too many jobs are queued — stop submitting new jobs until the queue clears. |
URL |
Link to sample erroneous rows. N/A when there are no errors. |
JobDetails |
Details about backends, scanned rows (updated every 5 seconds), task count, file count, and file size. |
Cancel an import job
Cancel a job that is in PENDING or LOADING state:
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];
To view the full syntax, run HELP CANCEL LOAD.
End-to-end example: load from OSS
This example loads TPC-H lineitem data from OSS into StarRocks.
Step 1: Create the destination table.
CREATE TABLE lineitem (
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber int,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate date,
l_commitdate date,
l_receiptdate date,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
"replication_num" = "1"
);
Step 2: Submit the import job.
For StarRocks versions earlier than 2.5.8
For StarRocks earlier than 2.5.8, add WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") after the closing parenthesis.
For StarRocks versions 2.5.8 and later
For StarRocks 2.5.8 and later:
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
Step 3: Monitor the job.
SHOW LOAD WHERE label = 'lineitem'\G
When the job succeeds, the output shows State: FINISHED and Progress: ETL:100%; LOAD:100%.
Step 4: Verify the data.
-- Check total row count
SELECT COUNT(*) FROM lineitem;
-- Preview the first two rows
SELECT * FROM lineitem LIMIT 2;
Concurrency
A Broker Load job is divided into tasks, then tasks are divided into instances distributed across backends in parallel.
Task splitting: A job produces one task per data_desc clause that points to a different source address or a different set of partitions.
Instance splitting: Each task is split into instances based on three frontend parameters:
| Parameter | Default | Description |
|---|---|---|
min_bytes_per_broker_scanner |
64 MB | Minimum data volume per instance. |
max_broker_concurrency |
100 | Maximum parallel instances per task. |
load_parallel_instance_num |
1 | Parallel instances per backend. |
Total instances = min(file_size / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × number of backends)
In practice, most jobs have a single data_desc clause and therefore a single task. That task is split into as many instances as there are backends, with each instance running on a different backend.
Troubleshooting
Job enters CANCELLED state
Check the ErrorMsg field in SHOW LOAD output. The following table maps each error type to its cause and fix:
| ErrorMsg type | Cause | Fix |
|---|---|---|
ETL-QUALITY-UNSATISFIED |
Error rate exceeds max_filter_ratio. |
Increase max_filter_ratio in PROPERTIES to allow more erroneous rows to be skipped, or fix the source data quality. |
TIMEOUT |
Job exceeded the timeout period. | Recalculate the minimum timeout using (Total file size in MB × source and rollup table count) / (30 × concurrency) and set a larger value in PROPERTIES ("timeout" = "..."). |
LOAD-RUN-FAIL |
A backend failed during the LOADING phase. | Check the URL field for sample erroneous rows. Inspect backend logs for hardware or network issues. |
ETL-RUN-FAIL |
Failure in the ETL phase. | Check the URL field for error details. |
USER-CANCEL |
The job was canceled manually. | Resubmit with the same label (CANCELLED labels can be reused). |
UNKNOWN |
An unexpected error occurred. | Check backend and frontend logs for details. |
LOADING progress stalls
Progress shown in SHOW LOAD is not linear — it updates only when full tables are imported. A stable percentage does not mean the job has stopped. Wait for the LoadFinishTime field to be populated before concluding the job has hung.
If LoadStartTime remains N/A for an extended period after the job is created, many jobs are queued. Stop submitting new jobs until the queue clears.
What's next
-
Query the StarRocks documentation for advanced
LOAD LABELoptions and usage ofHELP BROKER LOAD -
Monitor ongoing jobs with
SHOW LOADand review backend logs for performance tuning -
Adjust
min_bytes_per_broker_scanner,max_broker_concurrency, andload_parallel_instance_numon the frontend to tune import concurrency for your cluster