Routine Load continuously reads messages from a Kafka topic and loads them into ApsaraDB for SelectDB. Once created, a Routine Load job runs persistently — no need to resubmit after each batch.
How it works
When you create a Routine Load job, the frontend (FE) generates a persistent load job and splits it into multiple tasks. Each task is an independent transaction that consumes a bounded slice of Kafka messages. A task ends when the first of these thresholds is reached: max_batch_interval, max_batch_rows, or max_batch_size. After a task commits, a new task is immediately scheduled.
This architecture means concurrency, batch size, and error tolerance are all tunable without stopping the job.
Prerequisites
Before you begin, ensure that you have:
A Kafka cluster (version 0.10.0.0 or later) accessible from your ApsaraDB for SelectDB instance
A Kafka topic with messages in CSV or JSON format (CSV messages must be a single line with no trailing line feed)
A destination table in ApsaraDB for SelectDB
Usage notes
Kafka 0.10.0.0 and later are supported by default. To use an older Kafka version (0.9.0, 0.8.2, 0.8.1, or 0.8.0), use one of the following approaches:
Set
kafka_broker_version_fallbackin the backend (BE) configuration to the target Kafka version.Set
property.broker.version.fallbackwhen creating the Routine Load job.
Some features are unavailable with Kafka versions earlier than 0.10.0.0. For example, time-based partition offsets are not supported.
Quick start
This example loads CSV data from Kafka into a SelectDB table.
Step 1: Verify sample data in Kafka
1,Alice,30
2,Bob,25
3,Carol,35Step 2: Create the destination table
CREATE TABLE testdb.users (
id INT NOT NULL,
name VARCHAR(50),
age INT
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES ("replication_num" = "1");Step 3: Create the Routine Load job
CREATE ROUTINE LOAD testdb.load_users ON users
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "users",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);Step 4: Check job status
SHOW ROUTINE LOAD FOR testdb.load_users;Create a Routine Load job
Syntax
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]Parameters
| Parameter | Description |
|---|---|
[db.]job_name | Job name. Within a database, only one job with a given name can run at a time. |
tbl_name | Destination table name. |
merge_type | Data merge mode. Default: APPEND (standard append). For Unique Key model tables only, set to MERGE (requires DELETE ON) or DELETE (deletes all imported rows). |
load_properties | Parameters for processing the imported data. See load_properties parameters. |
job_properties | Job-level parameters. See job_properties parameters. |
data_source_properties | Kafka connection parameters. See data_source_properties parameters. |
load_properties parameters
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]| Parameter | Example | Description |
|---|---|---|
column_separator | COLUMNS TERMINATED BY "," | Column delimiter. Default: \t. |
columns_mapping | (k1, k2, tmpk1, k3=tmpk1+1) | Maps source columns to destination columns and applies transformations. See Converting source data. |
preceding_filter | — | Filters source data before column mapping. See Converting source data. |
where_predicates | WHERE k1 > 100 AND k2 = 1000 | Filters rows after column mapping. See Converting source data. |
partitions | PARTITION(p1, p2, p3) | Target partitions. If omitted, data is routed to the matching partition automatically. |
DELETE ON | DELETE ON v3 > 100 | Specifies the Delete Flag column expression. Required when merge_type is MERGE. Valid only for Unique Key model tables. |
ORDER BY | — | Specifies the Sequence Col to maintain row order during import. Valid only for Unique Key model tables. |
job_properties parameters
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)A task ends when the first ofmax_batch_interval,max_batch_rows, ormax_batch_sizeis reached.
| Parameter | Default | Valid range | Description |
|---|---|---|---|
desired_concurrent_number | 3 | Integer > 0 | Maximum tasks that can run concurrently for the job. The actual concurrency depends on cluster nodes, load, and the data source. For optimal throughput, set this to number of cluster cores / 16. |
max_batch_interval | 10 seconds | 5–60 seconds | Maximum execution duration per task. |
max_batch_rows | 200000 | ≥ 200000 | Maximum rows read per task. |
max_batch_size | 104857600 (100 MB) | 100 MB–1 GB | Maximum bytes read per task. |
max_error_number | 0 | Integer > 0 | Maximum error rows allowed within a sampling window (10 × max_batch_rows). If exceeded, the job pauses and requires manual intervention. Rows filtered by WHERE conditions are not counted as errors. |
strict_mode | false | true / false | When true, rows where a NOT NULL source value converts to NULL in the destination column are filtered out. Does not apply to function-derived columns. |
timezone | Session time zone | — | Time zone for all time-zone-related functions in the job (for example, "Africa/Abidjan"). |
format | CSV | CSV / json | Message format. |
jsonpaths | — | — | JSON field extraction paths for JSON-format data (for example, "[\"$.k2\",\"$.k1\"]"). |
strip_outer_array | false | true / false | When true, treats the top-level JSON array as multiple rows. |
json_root | — | — | Root node path for JSON extraction (for example, "$.RECORDS"). |
send_batch_parallelism | — | — | Maximum parallel threads for sending batch data. Capped at max_send_batch_parallelism_per_job in BE configuration. |
load_to_single_tablet | false | true / false | When true, loads data into a single tablet per partition. Only applies to Duplicate Key model tables with random partitioning. |
Strict mode behavior
Strict mode controls how type-conversion failures are handled.
Importing a TINYINT column (NULL allowed):
| Source data type | Example value | Conversion result | Strict mode | Outcome |
|---|---|---|---|---|
| NULL | \N | — | true or false | NULL |
| NOT NULL | aaa or 2000 | NULL | true | Filtered (invalid) |
| NOT NULL | aaa | NULL | false | NULL |
| NOT NULL | 1 | 1 | true or false | Imported |
Importing a DECIMAL(1,0) column (NULL allowed):
| Source data type | Example value | Conversion result | Strict mode | Outcome |
|---|---|---|---|---|
| NULL | \N | — | true or false | NULL |
| NOT NULL | aaa | NULL | true | Filtered (invalid) |
| NOT NULL | aaa | NULL | false | NULL |
| NOT NULL | 1 or 10 | 1 | true or false | Imported |
The value 10 exceeds the DECIMAL(1,0) range but is not filtered by strict mode, because the value itself passes type conversion. It is filtered later during the extract, transform, and load (ETL) process.data_source_properties parameters
FROM KAFKA (
"key1" = "val1",
"key2" = "val2"
)| Parameter | Description |
|---|---|
kafka_broker_list | Broker addresses. Format: host:port. Separate multiple brokers with commas. Example: "broker1:9092,broker2:9092". |
kafka_topic | Kafka topic to subscribe to. |
kafka_partitions | Comma-separated partition IDs to subscribe to. Example: "0,1,2,3". |
kafka_offsets | Start offset for each partition listed in kafka_partitions. Must have the same number of entries. Accepts numeric offsets, OFFSET_BEGINNING, OFFSET_END, or a timestamp in yyyy-MM-dd HH:mm:ss format. If omitted, defaults to the end offset of all partitions. |
property.* | Custom Kafka client properties, equivalent to --property in the Kafka shell. For file-based values, prefix with FILE: (for example, "FILE:ca.pem"). |
Time-based offsets and numeric offsets cannot be mixed in the same kafka_offsets value.
Partition and offset combinations
kafka_partitions, kafka_offsets, and property.kafka_default_offsets interact as follows:
| Method | kafka_partitions | kafka_offsets | property.kafka_default_offsets | Behavior |
|---|---|---|---|---|
| 1 | Not set | Not set | Not set | All partitions, starting from the end offset. |
| 2 | Not set | Not set | Set | All partitions, starting from the specified default offset. |
| 3 | Set | Not set | Not set | Specified partitions, starting from the end offset. |
| 4 | Set | Set | Not set | Specified partitions, starting from the specified offsets. |
| 5 | Set | Not set | Set | Specified partitions, starting from the specified default offset. |
Example — partitions with mixed offsets:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"Example — partitions with timestamp-based offsets:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"Examples
Load CSV data
Create a destination table:
CREATE TABLE test_table ( id INT, name VARCHAR(50), age INT, address VARCHAR(50), url VARCHAR(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES ("replication_num" = "1");Create a Routine Load job that reads from the beginning of all partitions:
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Create a Routine Load job with strict mode enabled:
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Start consuming from a specific timestamp:
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
Load JSON data
Routine Load supports two JSON message structures:
Single JSON object — one record per message:
{"key1":"value1","key2":"value2","key3":"value3"}JSON array — multiple records per message:
[ {"key1":"value11","key2":"value12","key3":"value13","key4":14}, {"key1":"value21","key2":"value22","key3":"value23","key4":24} ]
For multi-table import mode, prefix each message with the target table name:
table_name|{"key1":"value1","key2":"value2","key3":"value3"}Example: load JSON data
Create a destination table:
CREATE TABLE `example_tbl` ( `category` VARCHAR(24) NULL, `author` VARCHAR(24) NULL, `timestamp` BIGINT(20) NULL, `dt` INT(11) NULL, `price` DOUBLE REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`, `author`, `timestamp`, `dt`) PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`, `author`, `timestamp`) BUCKETS 4;Publish both message types to a Kafka topic:
{"category":"value1331","author":"value1233","timestamp":1700346050,"price":1413}[ {"category":"value13z2","author":"vaelue13","timestamp":1705645251,"price":14330}, {"category":"lvalue211","author":"lvalue122","timestamp":1684448450,"price":24440} ]Load in simple mode (field names match column names):
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category, price, author) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );Load with explicit JSON path extraction and outer array stripping:
The
dtcolumn does not appear in the source data. Its value is derived fromtimestampusingdt=from_unixtime(timestamp,'%Y%m%d')in theCOLUMNSclause.CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
Connect to Kafka clusters with authentication
ApsaraDB for SelectDB uses the librdkafka C++ client library to connect to Kafka. For supported configuration properties, see the librdkafka configuration reference.
SSL authentication
Upload the required certificate files first, then create the Routine Load job.
Upload certificate files:
CREATE FILE "ca.pem" PROPERTIES ("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES ("url" = "https://example_url/kafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES ("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");Create the Routine Load job:
Property Required Description property.security.protocolAlways Set to ssl.property.ssl.ca.locationAlways Path to the CA certificate that authenticates Kafka broker public keys. property.ssl.certificate.locationOnly when client authentication is enabled on the Kafka server Path to the client public key certificate. property.ssl.key.locationOnly when client authentication is enabled on the Kafka server Path to the client private key file. property.ssl.key.passwordOnly when client authentication is enabled on the Kafka server Password for the client private key. CREATE ROUTINE LOAD db1.job1 ON tbl1 PROPERTIES ( "desired_concurrent_number" = "1" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
PLAIN authentication
CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
"desired_concurrent_number" = "1"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "admin",
"property.sasl.password" = "admin"
);| Property | Value | Description |
|---|---|---|
property.security.protocol | SASL_PLAINTEXT | Use Simple Authentication and Security Layer (SASL) plaintext. |
property.sasl.mechanism | PLAIN | SASL mechanism. |
property.sasl.username | — | SASL username. |
property.sasl.password | — | SASL password. |
Kerberos authentication
Before creating the job, deploy the Kerberos client kinit on all nodes in your ApsaraDB for SelectDB cluster, configure krb5.conf, and specify the Key Distribution Center (KDC) service.
CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
"desired_concurrent_number" = "1"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.kerberos.service.name" = "kafka",
"property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
"property.sasl.kerberos.principal" = "id@your.com"
);| Property | Description |
|---|---|
property.security.protocol | Set to SASL_PLAINTEXT. |
property.sasl.kerberos.service.name | Kafka broker service name. |
property.sasl.kerberos.keytab | Absolute path to the local .keytab file. ApsaraDB for SelectDB processes must have read access to this file. |
property.sasl.kerberos.principal | Kerberos principal used to connect to the Kafka cluster. |
Modify a Routine Load job
Only jobs in the PAUSED state can be modified.
Syntax
ALTER ROUTINE LOAD FOR [db.]job_name
[job_properties]
FROM data_source
[data_source_properties]Modifiable parameters
job_properties — the following parameters can be changed:
desired_concurrent_numbermax_error_numbermax_batch_intervalmax_batch_rowsmax_batch_sizejsonpathsjson_rootstrip_outer_arraystrict_modetimezonenum_as_stringfuzzy_parse
data_source_properties — only Kafka properties can be changed:
kafka_partitionskafka_offsetskafka_broker_listkafka_topicCustom properties (for example,
property.group.id)
kafka_partitionsandkafka_offsetscan only modify offsets of partitions already being consumed. Adding new partitions is not supported.
Examples
Change the concurrency of job db1.label1 to 1:
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
"desired_concurrent_number" = "1"
);Change the concurrency to 10 and adjust partition offsets and the consumer group ID:
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
"desired_concurrent_number" = "10"
)
FROM KAFKA (
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "100, 200, 100",
"property.group.id" = "new_group"
);Pause a Routine Load job
Syntax
PAUSE ROUTINE LOAD FOR [db.]job_name;
PAUSE ALL ROUTINE LOAD;Examples
Pause job test1:
PAUSE ROUTINE LOAD FOR test1;Pause all Routine Load jobs in the current database:
PAUSE ALL ROUTINE LOAD;Resume a Routine Load job
A resumed job continues consuming from the last committed offset.
Syntax
RESUME ROUTINE LOAD FOR [db.]job_name;
RESUME ALL ROUTINE LOAD;Examples
Resume job test1:
RESUME ROUTINE LOAD FOR test1;Resume all paused Routine Load jobs in the current database:
RESUME ALL ROUTINE LOAD;Stop a Routine Load job
A stopped job cannot be restarted, and data imported before the stop cannot be rolled back.
Syntax
STOP ROUTINE LOAD FOR [db.]job_name;Example
STOP ROUTINE LOAD FOR test1;View Routine Load job status
Syntax
SHOW [ALL] ROUTINE LOAD [FOR [db.]job_name];Without ALL, only running and paused jobs are returned. With ALL, stopped and canceled jobs are also included.
Output fields
| Field | Description |
|---|---|
Id | Job ID, assigned automatically by ApsaraDB for SelectDB. |
Name | Job name. |
CreateTime | Time the job was created. |
PauseTime | Most recent time the job was paused. |
EndTime | Time the job ended (stopped or canceled). |
State | Current job state: NEED_SCHEDULED, RUNNING, PAUSED, STOPPED, or CANCELLED. |
DataSourceType | Data source type. Always KAFKA for Routine Load jobs. |
CurrentTaskNum | Number of tasks currently running. |
ErrorLogUrls | URLs pointing to error logs. Open any URL in a browser to view invalid-row details. |
Examples
Query all jobs named test1, including stopped and canceled ones:
SHOW ALL ROUTINE LOAD FOR test1;Query only running jobs named test1:
SHOW ROUTINE LOAD FOR test1;Query all jobs in example_db, including stopped and canceled ones:
USE example_db;
SHOW ALL ROUTINE LOAD;Query only running jobs in example_db:
USE example_db;
SHOW ROUTINE LOAD;Query a specific job by database and name:
SHOW ROUTINE LOAD FOR example_db.test1;System configuration
The following FE and BE parameters affect Routine Load behavior. All FE parameters can be modified at runtime.
| Parameter | Scope | Default | Description |
|---|---|---|---|
max_routine_load_task_concurrent_num | FE | 5 | Maximum concurrent tasks that can be run at a time for a Routine Load job. The default value is recommended. Setting it too high can exhaust cluster resources. |
max_routine_load_task_num_per_be | FE | 5 | Maximum concurrent tasks on each BE node. The default value is recommended. |
max_routine_load_job_num | FE | 100 | Maximum number of Routine Load jobs (in NEED_SCHEDULED, RUNNING, or PAUSED state). No new jobs can be created after this limit is reached. |
max_consumer_num_per_group | BE | 3 | Maximum consumers generated per task. For example, a task consuming 6 Kafka partitions spawns 3 consumers, each handling 2 partitions. |
max_tolerable_backend_down_num | FE | 0 | Maximum number of BE nodes that can be down before automatic job rescheduling is blocked. The value 0 means all BE nodes must be alive for rescheduling. |
period_of_auto_resume_min | FE | 5 minutes | ApsaraDB for SelectDB attempts to reschedule paused jobs up to 3 times within this window. After 3 failed attempts, the job is locked and requires manual intervention to resume. |
Other considerations
Routine Load and schema changes
A Routine Load job does not block
SCHEMA CHANGEorROLLUPoperations.After a
SCHEMA CHANGE, if source columns no longer match the destination table, error row counts increase and the job may pause. To prevent this, use explicit column mappings in theCOLUMNSclause and define destination columns asNULLABLEor with aDEFAULTvalue.If a partition is deleted, the job pauses because the target partition cannot be found.
Routine Load and other write operations
Routine Load does not conflict with
LOADorINSERToperations.Before running a
DELETEoperation on a table, pause the Routine Load job for that table and wait for all in-progress tasks to complete.
Routine Load and table or database deletion
If the destination table or database is deleted, the Routine Load job is automatically canceled.
Kafka topic auto-creation
If the topic specified in CREATE ROUTINE LOAD does not exist, Kafka brokers may auto-create it based on the auto.create.topics.enable setting:
true: Kafka creates the topic automatically withnum.partitionspartitions. The job reads from it immediately.false: The job pauses until the topic is created and data is available.
Network access requirements
All brokers listed in
kafka_broker_listmust be reachable from your ApsaraDB for SelectDB cluster.If
advertised.listenersis configured in Kafka, the advertised addresses must also be reachable.
STOPPED vs. PAUSED
| State | Behavior |
|---|---|
PAUSED | Job can be resumed with RESUME ROUTINE LOAD. |
STOPPED | Job is permanently ended and is cleared by the FE on a regular basis. Cannot be restarted. |