Routine Load continuously ingests data from Apache Kafka into StarRocks. Use SQL statements to create, pause, resume, and stop load jobs at any time.
How it works

Routine Load uses four internal components to manage continuous ingestion:
| Component | Role |
|---|---|
RoutineLoadJob | A submitted load job |
JobScheduler | Splits a RoutineLoadJob into tasks and schedules them |
Task | A unit of work split from a RoutineLoadJob |
TaskScheduler | Schedules and runs individual tasks |
The ingestion process works as follows:
Submit a Kafka load job to the frontend (FE) using a MySQL-compatible client.
The FE splits the job into tasks. Each task loads a portion of the data.
Each task is dispatched to a backend (BE) node, which runs it using the Stream Load mechanism.
The BE reports the task result back to the FE.
The FE generates new tasks or retries failed ones based on the result.
This cycle repeats continuously, keeping data flowing without interruption.
The diagram and some information in this topic are from Continuously load data from Apache Kafka in the open-source StarRocks documentation.
Prerequisites
Before you begin, ensure that you have:
Access to a Kafka cluster — either without authentication or via Secure Sockets Layer (SSL) authentication
Messages in the Kafka topic in CSV or JSON format. Each CSV message must occupy one row with no trailing line feed. The Array type is not supported.
Kafka 0.10.0.0 or later
Create a load job
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
[COLUMNS TERMINATED BY "column_separator",]
[COLUMNS (col1, col2, ...),]
[WHERE where_condition,]
[PARTITION (part1, part2, ...)]
[PROPERTIES ("key" = "value", ...)]
FROM KAFKA
(data_source_properties1 = 'value1',
data_source_properties2 = 'value2',
...)Required parameters
| Parameter | Description |
|---|---|
job_name | Name of the load job. Must be unique within a database. The recommended format is <timestamp>_<table_name>. |
table_name | Destination table. |
kafka_broker_list | Kafka broker connection strings. Format: <ip>:<port>. Separate multiple brokers with commas. |
kafka_topic | Kafka topic to subscribe to. |
Optional parameters
Data shaping
| Parameter | Default | Description |
|---|---|---|
COLUMNS TERMINATED BY | \t | Column delimiter in the source data. |
COLUMNS | — | Maps source columns to destination columns. Use a placeholder name for columns to skip; use expressions for derived columns (e.g., col4 = col1 + col2). |
WHERE | — | Filters rows before loading. Rows filtered by WHERE are not counted as error rows. |
PARTITION | — | Loads data into specific partitions. If omitted, StarRocks routes data automatically. |
Job behavior
| Parameter | Default | Valid range | Description |
|---|---|---|---|
desired_concurrent_number | 3 | > 0 | Maximum number of concurrent tasks the job can run. |
max_batch_interval | 10 | 5–60 (seconds) | Maximum execution time per task. In V1.15 and later, this controls the task scheduling interval — how often tasks are dispatched. |
max_batch_rows | 200000 | ≥ 200000 | Maximum number of rows each task can read. In V1.15 and later, this defines the error-detection window size (10 × max_batch_rows). |
max_batch_size | 100 MB | 100 MB–1 GB | Maximum data read per task. Deprecated in V1.15 and later — use routine_load_task_consume_second in fe.conf instead. |
max_error_number | 0 | ≥ 0 | Maximum error rows allowed in a sampling window. 0 means no errors are tolerated. |
strict_mode | enabled | — | When enabled, rows where a non-null source column maps to NULL are filtered out. Set to false to disable. |
timezone | Session time zone | — | Affects all time zone-related functions in the load job. |
Kafka source settings
| Parameter | Description |
|---|---|
kafka_partitions | Partitions to subscribe to. Comma-separated partition IDs. |
kafka_offsets | Start offset for each partition. Comma-separated. |
property.kafka_default_offsets | Default start offset. Accepts OFFSET_BEGINNING, OFFSET_END, or a Unix timestamp. |
property.* | Additional Kafka properties, equivalent to --property in the Kafka CLI. |
Frontend (FE) configuration (fe.conf)
| Parameter | Default | Description |
|---|---|---|
routine_load_task_consume_second | 3s | Time each task spends consuming data from Kafka. |
routine_load_task_timeout_second | 15s | Maximum execution time per task before it times out. |
Example: Load JSON data from Kafka (unauthenticated)
This example creates a load job named example_tbl2_ordertest that continuously consumes messages from the ordertest2 topic and loads them into the example_tbl table, starting from the earliest available offset. The pay_dt column is derived from the pay_time Unix timestamp using from_unixtime().
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);Run SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest\G and confirm State is RUNNING and loadedRows in Statistic is increasing.
Example: Load data over SSL
Add the following properties to enable SSL authentication:
-- Security protocol
"property.security.protocol" = "ssl",
-- CA certificate path
"property.ssl.ca.location" = "FILE:ca-cert",
-- Client certificate and key (required if the Kafka server enforces client authentication)
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "******"When using CREATE FILE to upload certificate files, specify the HTTP address of Object Storage Service (OSS) as the URL. For details on OSS endpoints, see Use an endpoint that supports IPv6 to access OSS.
For the CREATE FILE statement syntax, see CREATE FILE.
Monitor a load job
Check job status
To view all load jobs in a database (including stopped and canceled jobs):
USE load_test;
SHOW ALL ROUTINE LOAD;To view a specific running job:
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;SHOW ROUTINE LOAD without ALL returns only running jobs. Use SHOW ALL ROUTINE LOAD to include jobs in other states.
You can also view the status of a load job in the EMR StarRocks Manager console. Go to the EMR StarRocks Manager page, click Metadata Management in the left-side navigation pane, click the name of the desired database, and then click the Tasks tab.
Sample output:
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)Key health indicators:
State: Should beRUNNING. See the job states table below for other states.loadedRowsanderrorRowsinStatistic: IferrorRowsis rising relative toloadedRows, the job may auto-pause. CheckReasonOfStateChangedand the URLs inErrorLogUrls.Progress: Shows the current Kafka offset per partition. Verify it is advancing on each check.
Fields in the `Statistic` output:
| Field | Unit | Description |
|---|---|---|
receivedBytes | Bytes | Total data received since job creation |
errorRows | Rows | Rows that failed to load |
committedTaskNum | Tasks | Tasks submitted by the FE |
loadedRows | Rows | Rows successfully loaded |
loadRowsRate | Rows/s | Current load throughput |
abortedTaskNum | Tasks | Tasks that failed on the BE |
totalRows | Rows | Total rows received (loaded + error + unselected) |
unselectedRows | Rows | Rows filtered by the WHERE clause |
receivedBytesRate | Bytes/s | Data receive rate |
taskExecuteTimeMs | ms | Total task execution time |
Job states
| State | Meaning | Next action |
|---|---|---|
NEED_SCHEDULE | Job is waiting to be scheduled (just created or just resumed) | Wait; transitions to RUNNING automatically |
RUNNING | Job is loading data | Monitor loadedRows and errorRows |
PAUSED | Job is paused; can be resumed | Run RESUME ROUTINE LOAD |
STOPPED | Job is permanently stopped | Cannot be resumed; create a new job |
If a job transitions to PAUSED automatically, the error row count exceeded max_error_number. Check ReasonOfStateChanged and the error log URLs in ErrorLogUrls to identify the cause.
Manage a load job
Pause a load job
PAUSE ROUTINE LOAD FOR <job_name>;The job enters the PAUSED state. Data ingestion stops, but the job is not terminated. The Statistic and Progress fields stop updating. Run RESUME ROUTINE LOAD to restart it.
Resume a load job
RESUME ROUTINE LOAD FOR <job_name>;The job first enters NEED_SCHEDULE, then transitions to RUNNING once rescheduled.
Modify a load job
Only jobs in the PAUSED state can be modified.
ALTER ROUTINE LOAD FOR <job_name>
PROPERTIES
(
"desired_concurrent_number" = "6"
);Stop a load job
STOP ROUTINE LOAD FOR <job_name>;The job enters the STOPPED state and is permanently terminated. Statistic and Progress stop updating, and the job no longer appears in SHOW ROUTINE LOAD output. To resume data ingestion, create a new load job.

End-to-end example
This example creates a Routine Load job that ingests CSV data from Kafka into StarRocks.
Step 1: Prepare Kafka data
Create a topic.
kafka-topics.sh --create \ --topic order_sr_topic \ --replication-factor 3 \ --partitions 10 \ --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092Start the console producer.
kafka-console-producer.sh \ --broker-list core-1-1:9092 \ --topic order_sr_topicEnter the following rows:
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924Each row has six columns:
order_id,pay_dt,customer_name,nationality,gender(to be skipped), andprice.
Step 2: Create the destination table
Create a table in StarRocks that maps to columns 1, 2, 3, 4, and 6 — skipping the gender column.
CREATE TABLE load_test.routine_load_tbl_csv (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Purchase date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Country",
`price` double NULL COMMENT "Payment amount"
)
ENGINE = OLAP
PRIMARY KEY (order_id, pay_dt)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;Step 3: Create the load job
The COLUMNS clause maps all six source columns and uses a placeholder (temp_gender) to skip the fifth column.
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
"kafka_topic" = "order_sr_topic",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);Step 4: Verify the job
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;If the status is RUNNING, the job is running normally.
Confirm the following three fields:
StateisRUNNING.DataSourcePropertiesshows"topic":"order_sr_topic"and the correct broker addresses.StatisticshowsloadedRowsincreasing anderrorRowsat 0.
Query the destination table to confirm data is loaded:
SELECT * FROM load_test.routine_load_tbl_csv LIMIT 5;Expected result:
| order_id | pay_dt | customer_name | nationality | price |
|---|---|---|---|---|
| 2020050802 | 2020-05-08 | Johann Georg Faust | Deutschland | 895 |
| 2020050802 | 2020-05-08 | Julien Sorel | France | 893 |
| 2020050803 | 2020-05-08 | Dorian Grey | UK | 1262 |
| 2020051001 | 2020-05-10 | Tess Durbeyfield | US | 986 |
| 2020051101 | 2020-05-11 | Edogawa Conan | japan | 8924 |