All Products
Search
Document Center

E-MapReduce:Broker Load

Last Updated:Mar 26, 2026

Broker Load is an asynchronous import method in StarRocks that reads data from external storage systems using broker processes, then uses StarRocks' own compute resources to preprocess and load the data. Use Broker Load to import tens to hundreds of GB per job from Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). A single Broker Load job can target multiple destination tables atomically — either all tables succeed or all fail.

How Broker Load works

Broker Load has two operating modes depending on your StarRocks version:

  • Broker-based loading (StarRocks earlier than 2.5.8): StarRocks depends on broker processes to connect to external storage. Specify the broker name explicitly with WITH BROKER "<broker_name>" in the load statement.

  • Broker-free loading (StarRocks 2.5.8 and later): StarRocks connects to external storage directly. Omit the broker name but keep the WITH BROKER keyword in the load statement.

When you create a StarRocks cluster in E-MapReduce (EMR), broker processes are automatically deployed and started on every core node.

Supported sources and formats

Dimension Supported values
Data sources HDFS, OSS
File formats CSV (default), ORC, Parquet (.parquet or .parq)

Prerequisites

Before you begin, ensure that you have:

  • A running StarRocks cluster in EMR with at least one core node

  • Access credentials for HDFS or OSS

  • A destination database and table in StarRocks

Query brokers

To confirm brokers are running on your cluster, run:

SHOW PROC "/brokers"\G

The output lists each broker with its IP address, port (default: 8000), alive status, and timestamps:

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:

Create an import job

All import jobs use the LOAD LABEL statement. The syntax differs slightly between StarRocks versions.

For StarRocks versions earlier than 2.5.8

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_name broker_properties
    [PROPERTIES (key1=value1, ... )]

For StarRocks versions 2.5.8 and later

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_properties
    [PROPERTIES (key1=value1, ... )]
In EMR StarRocks clusters, use broker as the broker name.

To view the full syntax reference, run HELP BROKER LOAD.

Label

Each import job requires a unique label. Define a custom label or let the system generate one. The label serves two purposes:

  • Track job status with SHOW LOAD WHERE label = '<label>'

  • Prevent duplicate imports — StarRocks rejects a new job if a FINISHED job already has the same label

After a job reaches FINISHED state, the label expires. After a job is CANCELLED, the same label can be reused to resubmit the job.

data_desc parameters

The data_desc clause describes the source data for one destination table. A single job can contain multiple data_desc clauses targeting different tables — StarRocks guarantees atomicity across all tables in one job.

data_desc:
    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY column_separator]
    [FORMAT AS file_type]
    [(col1, ...)]
    [COLUMNS FROM PATH AS (colx, ...)]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
Parameter Description
file_path A specific file path or a glob pattern using wildcards (?, *, [], {}, ^). For example, hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ imports all files in all partitions under /tablename. See FileSystem.globStatus for wildcard syntax.
NEGATIVE Marks source rows as deletions. Use this to undo a batch import to aggregate columns of the SUM type.
PARTITION Restricts import to the specified partitions in the destination table. Rows that don't belong to the listed partitions are counted as errors. Use a WHERE predicate to filter them out instead.
column_separator Column delimiter in source files. Default: \t. For invisible characters, use hexadecimal format (e.g., \\x01 for the Hive default \x01).
file_type Source file format: csv (default), orc, or parquet.
COLUMNS FROM PATH AS Extracts partition field values from the file path. For example, a path /path/col_name=col_value/dt=20210101/file1 lets you import col_value and 20210101 into the col_name and dt table columns.
SET Column type conversion functions. Required when source and destination column types differ.
WHERE predicate Filters rows after column type conversion. Filtered rows are excluded from the error rate calculation. Multiple WHERE predicates for the same table are combined with AND.

broker_properties and job-level properties

The broker_properties clause passes storage credentials and connection settings. The optional PROPERTIES block controls job execution behavior.

broker_properties:
    (key2=value2, ...)
Important

These are different scopes: broker_properties (inside WITH BROKER) configures how StarRocks connects to the storage system. PROPERTIES (the outer block) controls how the job runs — timeout, filter ratio, memory, and mode.

Parameter Description
timeout Job timeout in seconds. Default: 14,400 (4 hours). If the job doesn't complete within this period, it moves to CANCELLED. Estimate the minimum timeout with: (Total file size in MB × Number of source and rollup tables) / (30 × concurrency). For example, a 1 GB file with one source table, two rollup tables, and concurrency of 3 requires at least (1 × 1,024 × 3) / (10 × 3) = 102 seconds. Set a timeout only if you expect the job to exceed the default.
max_filter_ratio Maximum acceptable error rate, from 0 to 1. Default: 0. If the error rate exceeds this value, the job fails. Set a value greater than 0 to allow some erroneous rows to be skipped. Calculate the ratio as: max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL).
load_mem_limit Memory limit for the job in bytes. Default: 0 (no limit).
strict_mode When set to true, rows where non-null source values convert to null are treated as errors and filtered out. Default: disabled. Enable with PROPERTIES ("strict_mode" = "true"). Strict mode does not apply to function-generated columns or values that are valid but out of the destination column's range.

Load data from OSS

The following example loads a TPC-H lineitem file from OSS into a StarRocks table.

For StarRocks versions earlier than 2.5.8

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

For StarRocks versions 2.5.8 and later

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
    "fs.oss.accessKeyId" = "xxx",
    "fs.oss.accessKeySecret" = "xxx",
    "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);

Load data from HDFS

Authentication

HDFS supports two authentication modes.

Simple authentication — user identity is determined by the OS of the connecting client:

Parameter Description
hadoop.security.authentication Set to simple.
username HDFS username.
password HDFS password.

Kerberos authentication — user identity is determined by Kerberos credentials:

Parameter Description
hadoop.security.authentication Set to kerberos.
kerberos_principal The Kerberos principal.
kerberos_keytab Path to the Kerberos keytab file. The file must reside on the same server as the broker process.
kerberos_keytab_content Base64-encoded content of the keytab file. Configure either this parameter or kerberos_keytab, not both.

High availability configuration

For HDFS clusters with NameNode high availability (HA), configure the following parameters so StarRocks can automatically detect active NameNode failover:

Parameter Description
dfs.nameservices Custom name for the HDFS service (e.g., my-ha).
dfs.ha.namenodes.xxx Comma-separated NameNode names. Replace xxx with the value of dfs.nameservices.
dfs.namenode.rpc-address.xxx.nn Remote procedure call (RPC) address of each NameNode (Hostname:Port). Replace nn with each NameNode name.
dfs.client.failover.proxy.provider.xxx Failover proxy provider. Default: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.

Example broker_properties for an HA HDFS cluster with simple authentication:

(
    "username" = "user",
    "password" = "passwd",
    "dfs.nameservices" = "my-ha",
    "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
    "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
    "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
    "dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

If the cluster's HDFS configuration is stored in hdfs-site.xml, specify only the file path and authentication parameters.

HDFS load example

LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1, tmp_c2)
    SET
    (
        id = tmp_c2,
        name = tmp_c1
    ),

    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
    INTO TABLE tbl2
    COLUMNS TERMINATED BY ","
    (col1, col2)
    WHERE col1 > 1
)
WITH BROKER 'broker1'
(
    "username" = "hdfs_username",
    "password" = "hdfs_password"
)
PROPERTIES
(
    "timeout" = "3600"
);

Query job status

Broker Load is asynchronous. After submitting a job, use SHOW LOAD to track its progress:

SHOW LOAD WHERE label = 'label1'\G

To view the full SHOW LOAD syntax, run HELP SHOW LOAD.

SHOW LOAD works only with asynchronous import methods. It does not apply to synchronous methods like Stream Load.

The output looks like:

*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
Field Description
JobId System-generated unique ID for the job. Never reused.
Label The label you specified or that was auto-generated.
State Current job state: PENDING (waiting), LOADING (running), CANCELLED (failed), or FINISHED (succeeded).
Progress Broker Load has no ETL phase, so ETL always shows N/A. LOADING progress is (tables imported / total source tables) × 100%. It reaches 99% when all tables are imported but the job is finalizing — 100% only after the job is fully complete. Progress is not linear; a stable percentage does not mean the job has stopped.
Type Always BROKER for Broker Load jobs.
EtlInfo Three counters: unselected.rows (rows filtered by WHERE), dpp.norm.ALL (rows successfully loaded), dpp.abnorm.ALL (rows with errors). Their sum equals the total number of rows in the source files.
TaskInfo The job parameters you configured: cluster, timeout, and max_filter_ratio.
ErrorMsg Error details when the job is CANCELLED. N/A when FINISHED. The type field can be: USER-CANCEL, ETL-RUN-FAIL, ETL-QUALITY-UNSATISFIED, LOAD-RUN-FAIL, TIMEOUT, or UNKNOWN.
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime Timestamps for each phase. Broker Load has no ETL phase, so EtlStartTime, EtlFinishTime, and LoadStartTime are the same value. If only CreateTime is populated and LoadStartTime remains N/A for an extended period, too many jobs are queued — stop submitting new jobs until the queue clears.
URL Link to sample erroneous rows. N/A when there are no errors.
JobDetails Details about backends, scanned rows (updated every 5 seconds), task count, file count, and file size.

Cancel an import job

Cancel a job that is in PENDING or LOADING state:

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];

To view the full syntax, run HELP CANCEL LOAD.

End-to-end example: load from OSS

This example loads TPC-H lineitem data from OSS into StarRocks.

Step 1: Create the destination table.

CREATE TABLE lineitem (
  l_orderkey      bigint,
  l_partkey       bigint,
  l_suppkey       bigint,
  l_linenumber    int,
  l_quantity      double,
  l_extendedprice double,
  l_discount      double,
  l_tax           double,
  l_returnflag    string,
  l_linestatus    string,
  l_shipdate      date,
  l_commitdate    date,
  l_receiptdate   date,
  l_shipinstruct  string,
  l_shipmode      string,
  l_comment       string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
  "replication_num" = "1"
);

Step 2: Submit the import job.

For StarRocks versions earlier than 2.5.8

For StarRocks earlier than 2.5.8, add WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") after the closing parenthesis.

For StarRocks versions 2.5.8 and later

For StarRocks 2.5.8 and later:

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

Step 3: Monitor the job.

SHOW LOAD WHERE label = 'lineitem'\G

When the job succeeds, the output shows State: FINISHED and Progress: ETL:100%; LOAD:100%.

Step 4: Verify the data.

-- Check total row count
SELECT COUNT(*) FROM lineitem;

-- Preview the first two rows
SELECT * FROM lineitem LIMIT 2;

Concurrency

A Broker Load job is divided into tasks, then tasks are divided into instances distributed across backends in parallel.

Task splitting: A job produces one task per data_desc clause that points to a different source address or a different set of partitions.

Instance splitting: Each task is split into instances based on three frontend parameters:

Parameter Default Description
min_bytes_per_broker_scanner 64 MB Minimum data volume per instance.
max_broker_concurrency 100 Maximum parallel instances per task.
load_parallel_instance_num 1 Parallel instances per backend.

Total instances = min(file_size / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × number of backends)

In practice, most jobs have a single data_desc clause and therefore a single task. That task is split into as many instances as there are backends, with each instance running on a different backend.

Troubleshooting

Job enters CANCELLED state

Check the ErrorMsg field in SHOW LOAD output. The following table maps each error type to its cause and fix:

ErrorMsg type Cause Fix
ETL-QUALITY-UNSATISFIED Error rate exceeds max_filter_ratio. Increase max_filter_ratio in PROPERTIES to allow more erroneous rows to be skipped, or fix the source data quality.
TIMEOUT Job exceeded the timeout period. Recalculate the minimum timeout using (Total file size in MB × source and rollup table count) / (30 × concurrency) and set a larger value in PROPERTIES ("timeout" = "...").
LOAD-RUN-FAIL A backend failed during the LOADING phase. Check the URL field for sample erroneous rows. Inspect backend logs for hardware or network issues.
ETL-RUN-FAIL Failure in the ETL phase. Check the URL field for error details.
USER-CANCEL The job was canceled manually. Resubmit with the same label (CANCELLED labels can be reused).
UNKNOWN An unexpected error occurred. Check backend and frontend logs for details.

LOADING progress stalls

Progress shown in SHOW LOAD is not linear — it updates only when full tables are imported. A stable percentage does not mean the job has stopped. Wait for the LoadFinishTime field to be populated before concluding the job has hung.

If LoadStartTime remains N/A for an extended period after the job is created, many jobs are queued. Stop submitting new jobs until the queue clears.

What's next

  • Query the StarRocks documentation for advanced LOAD LABEL options and usage of HELP BROKER LOAD

  • Monitor ongoing jobs with SHOW LOAD and review backend logs for performance tuning

  • Adjust min_bytes_per_broker_scanner, max_broker_concurrency, and load_parallel_instance_num on the frontend to tune import concurrency for your cluster