All Products
Search
Document Center

E-MapReduce:Routine Load

Last Updated:Mar 25, 2026

Routine Load continuously ingests data from Apache Kafka into StarRocks. Use SQL statements to create, pause, resume, and stop load jobs at any time.

How it works

Routine Load

Routine Load uses four internal components to manage continuous ingestion:

ComponentRole
RoutineLoadJobA submitted load job
JobSchedulerSplits a RoutineLoadJob into tasks and schedules them
TaskA unit of work split from a RoutineLoadJob
TaskSchedulerSchedules and runs individual tasks

The ingestion process works as follows:

  1. Submit a Kafka load job to the frontend (FE) using a MySQL-compatible client.

  2. The FE splits the job into tasks. Each task loads a portion of the data.

  3. Each task is dispatched to a backend (BE) node, which runs it using the Stream Load mechanism.

  4. The BE reports the task result back to the FE.

  5. The FE generates new tasks or retries failed ones based on the result.

  6. This cycle repeats continuously, keeping data flowing without interruption.

The diagram and some information in this topic are from Continuously load data from Apache Kafka in the open-source StarRocks documentation.

Prerequisites

Before you begin, ensure that you have:

  • Access to a Kafka cluster — either without authentication or via Secure Sockets Layer (SSL) authentication

  • Messages in the Kafka topic in CSV or JSON format. Each CSV message must occupy one row with no trailing line feed. The Array type is not supported.

  • Kafka 0.10.0.0 or later

Create a load job

CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
    [COLUMNS TERMINATED BY "column_separator",]
    [COLUMNS (col1, col2, ...),]
    [WHERE where_condition,]
    [PARTITION (part1, part2, ...)]
    [PROPERTIES ("key" = "value", ...)]
    FROM KAFKA
    (data_source_properties1 = 'value1',
    data_source_properties2 = 'value2',
    ...)

Required parameters

ParameterDescription
job_nameName of the load job. Must be unique within a database. The recommended format is <timestamp>_<table_name>.
table_nameDestination table.
kafka_broker_listKafka broker connection strings. Format: <ip>:<port>. Separate multiple brokers with commas.
kafka_topicKafka topic to subscribe to.

Optional parameters

Data shaping

ParameterDefaultDescription
COLUMNS TERMINATED BY\tColumn delimiter in the source data.
COLUMNSMaps source columns to destination columns. Use a placeholder name for columns to skip; use expressions for derived columns (e.g., col4 = col1 + col2).
WHEREFilters rows before loading. Rows filtered by WHERE are not counted as error rows.
PARTITIONLoads data into specific partitions. If omitted, StarRocks routes data automatically.

Job behavior

ParameterDefaultValid rangeDescription
desired_concurrent_number3> 0Maximum number of concurrent tasks the job can run.
max_batch_interval105–60 (seconds)Maximum execution time per task. In V1.15 and later, this controls the task scheduling interval — how often tasks are dispatched.
max_batch_rows200000≥ 200000Maximum number of rows each task can read. In V1.15 and later, this defines the error-detection window size (10 × max_batch_rows).
max_batch_size100 MB100 MB–1 GBMaximum data read per task. Deprecated in V1.15 and later — use routine_load_task_consume_second in fe.conf instead.
max_error_number0≥ 0Maximum error rows allowed in a sampling window. 0 means no errors are tolerated.
strict_modeenabledWhen enabled, rows where a non-null source column maps to NULL are filtered out. Set to false to disable.
timezoneSession time zoneAffects all time zone-related functions in the load job.

Kafka source settings

ParameterDescription
kafka_partitionsPartitions to subscribe to. Comma-separated partition IDs.
kafka_offsetsStart offset for each partition. Comma-separated.
property.kafka_default_offsetsDefault start offset. Accepts OFFSET_BEGINNING, OFFSET_END, or a Unix timestamp.
property.*Additional Kafka properties, equivalent to --property in the Kafka CLI.

Frontend (FE) configuration (fe.conf)

ParameterDefaultDescription
routine_load_task_consume_second3sTime each task spends consuming data from Kafka.
routine_load_task_timeout_second15sMaximum execution time per task before it times out.

Example: Load JSON data from Kafka (unauthenticated)

This example creates a load job named example_tbl2_ordertest that continuously consumes messages from the ordertest2 topic and loads them into the example_tbl table, starting from the earliest available offset. The pay_dt column is derived from the pay_time Unix timestamp using from_unixtime().

CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
    "desired_concurrent_number" = "5",
    "format" = "json",
    "jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
    "kafka_topic" = "ordertest2",
    "kafka_partitions" = "0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Run SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest\G and confirm State is RUNNING and loadedRows in Statistic is increasing.

Example: Load data over SSL

Add the following properties to enable SSL authentication:

-- Security protocol
"property.security.protocol" = "ssl",

-- CA certificate path
"property.ssl.ca.location" = "FILE:ca-cert",

-- Client certificate and key (required if the Kafka server enforces client authentication)
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "******"

When using CREATE FILE to upload certificate files, specify the HTTP address of Object Storage Service (OSS) as the URL. For details on OSS endpoints, see Use an endpoint that supports IPv6 to access OSS.

For the CREATE FILE statement syntax, see CREATE FILE.

Monitor a load job

Check job status

To view all load jobs in a database (including stopped and canceled jobs):

USE load_test;
SHOW ALL ROUTINE LOAD;

To view a specific running job:

SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
Important

SHOW ROUTINE LOAD without ALL returns only running jobs. Use SHOW ALL ROUTINE LOAD to include jobs in other states.

You can also view the status of a load job in the EMR StarRocks Manager console. Go to the EMR StarRocks Manager page, click Metadata Management in the left-side navigation pane, click the name of the desired database, and then click the Tasks tab.

Sample output:

*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)

Key health indicators:

  • State: Should be RUNNING. See the job states table below for other states.

  • loadedRows and errorRows in Statistic: If errorRows is rising relative to loadedRows, the job may auto-pause. Check ReasonOfStateChanged and the URLs in ErrorLogUrls.

  • Progress: Shows the current Kafka offset per partition. Verify it is advancing on each check.

Fields in the `Statistic` output:

FieldUnitDescription
receivedBytesBytesTotal data received since job creation
errorRowsRowsRows that failed to load
committedTaskNumTasksTasks submitted by the FE
loadedRowsRowsRows successfully loaded
loadRowsRateRows/sCurrent load throughput
abortedTaskNumTasksTasks that failed on the BE
totalRowsRowsTotal rows received (loaded + error + unselected)
unselectedRowsRowsRows filtered by the WHERE clause
receivedBytesRateBytes/sData receive rate
taskExecuteTimeMsmsTotal task execution time

Job states

StateMeaningNext action
NEED_SCHEDULEJob is waiting to be scheduled (just created or just resumed)Wait; transitions to RUNNING automatically
RUNNINGJob is loading dataMonitor loadedRows and errorRows
PAUSEDJob is paused; can be resumedRun RESUME ROUTINE LOAD
STOPPEDJob is permanently stoppedCannot be resumed; create a new job

If a job transitions to PAUSED automatically, the error row count exceeded max_error_number. Check ReasonOfStateChanged and the error log URLs in ErrorLogUrls to identify the cause.

Manage a load job

Pause a load job

PAUSE ROUTINE LOAD FOR <job_name>;

The job enters the PAUSED state. Data ingestion stops, but the job is not terminated. The Statistic and Progress fields stop updating. Run RESUME ROUTINE LOAD to restart it.

Resume a load job

RESUME ROUTINE LOAD FOR <job_name>;

The job first enters NEED_SCHEDULE, then transitions to RUNNING once rescheduled.

Modify a load job

Only jobs in the PAUSED state can be modified.

ALTER ROUTINE LOAD FOR <job_name>
PROPERTIES
(
    "desired_concurrent_number" = "6"
);

Stop a load job

STOP ROUTINE LOAD FOR <job_name>;

The job enters the STOPPED state and is permanently terminated. Statistic and Progress stop updating, and the job no longer appears in SHOW ROUTINE LOAD output. To resume data ingestion, create a new load job.

stop

End-to-end example

This example creates a Routine Load job that ingests CSV data from Kafka into StarRocks.

Step 1: Prepare Kafka data

  1. Create a topic.

    kafka-topics.sh --create \
      --topic order_sr_topic \
      --replication-factor 3 \
      --partitions 10 \
      --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092
  2. Start the console producer.

    kafka-console-producer.sh \
      --broker-list core-1-1:9092 \
      --topic order_sr_topic
  3. Enter the following rows:

    2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
    2020050802,2020-05-08,Julien Sorel,France,male,893
    2020050803,2020-05-08,Dorian Grey,UK,male,1262
    2020051001,2020-05-10,Tess Durbeyfield,US,female,986
    2020051101,2020-05-11,Edogawa Conan,japan,male,8924

    Each row has six columns: order_id, pay_dt, customer_name, nationality, gender (to be skipped), and price.

Step 2: Create the destination table

Create a table in StarRocks that maps to columns 1, 2, 3, 4, and 6 — skipping the gender column.

CREATE TABLE load_test.routine_load_tbl_csv (
    `order_id`       bigint      NOT NULL COMMENT "Order ID",
    `pay_dt`         date        NOT NULL COMMENT "Purchase date",
    `customer_name`  varchar(26) NULL     COMMENT "Customer name",
    `nationality`    varchar(26) NULL     COMMENT "Country",
    `price`          double      NULL     COMMENT "Payment amount"
)
ENGINE = OLAP
PRIMARY KEY (order_id, pay_dt)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;

Step 3: Create the load job

The COLUMNS clause maps all six source columns and uses a placeholder (temp_gender) to skip the fifth column.

CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
    "desired_concurrent_number" = "5"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
    "kafka_topic" = "order_sr_topic",
    "kafka_partitions" = "0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Step 4: Verify the job

SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;

If the status is RUNNING, the job is running normally.

Confirm the following three fields:

  1. State is RUNNING.

  2. DataSourceProperties shows "topic":"order_sr_topic" and the correct broker addresses.

  3. Statistic shows loadedRows increasing and errorRows at 0.

Query the destination table to confirm data is loaded:

SELECT * FROM load_test.routine_load_tbl_csv LIMIT 5;

Expected result:

order_idpay_dtcustomer_namenationalityprice
20200508022020-05-08Johann Georg FaustDeutschland895
20200508022020-05-08Julien SorelFrance893
20200508032020-05-08Dorian GreyUK1262
20200510012020-05-10Tess DurbeyfieldUS986
20200511012020-05-11Edogawa Conanjapan8924