Routine Load secara terus-menerus mengonsumsi data dari Apache Kafka ke StarRocks pada EMR. Setelah job load berjalan, StarRocks secara otomatis melakukan polling terhadap topik Kafka—Anda mengontrol siklus hidup job tersebut menggunakan Pernyataan SQL (pause, resume, atau stop).
Gunakan Routine Load jika Anda memerlukan pipeline ingestion yang persisten dan selalu aktif dari Kafka. Untuk pemuatan massal satu kali, gunakan Stream Load atau Broker Load.
Cara kerja

Routine Load menggunakan model penjadwalan dua tingkat:
Kirim job load ke frontend (FE) melalui klien apa pun yang kompatibel dengan MySQL.
JobScheduler membagi job tersebut menjadi beberapa task. Setiap task mencakup subset partisi Kafka tertentu.
TaskScheduler menetapkan setiap task ke backend (BE). BE memperlakukan task tersebut sebagai job Stream Load dan menulis data ke StarRocks.
Setelah task selesai, BE melaporkan hasilnya ke FE.
FE membuat task baru untuk batch berikutnya atau mencoba ulang task yang gagal.
Siklus ini berulang terus-menerus, menjaga aliran data tanpa gangguan.
Konsep utama
| Term | Description |
|---|---|
| RoutineLoadJob | Job load yang dikirimkan ke FE |
| JobScheduler | Membagi RoutineLoadJob menjadi task berdasarkan aturan yang dikonfigurasi |
| Task | Satu unit pekerjaan yang berasal dari RoutineLoadJob |
| TaskScheduler | Menjadwalkan eksekusi task di seluruh node backend |
Gambar dan beberapa informasi dalam topik ini berasal dari Continuously load data from Apache Kafka dalam dokumentasi open-source StarRocks.
Prasyarat
Sebelum memulai, pastikan:
Versi Kafka adalah V0.10.0.0 atau lebih baru.
Kluster Kafka tidak menggunakan autentikasi atau menggunakan autentikasi SSL (jenis autentikasi lain tidak didukung).
Pesan dalam format CSV atau JSON—tipe Array tidak didukung.
Untuk CSV: setiap pesan berupa satu baris; baris tersebut tidak boleh diakhiri dengan karakter line feed.
Memulai cepat
Contoh ini menunjukkan cara membuat job Routine Load yang membaca dari kluster Kafka lokal.
1. Create a load job
CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
PROPERTIES
(
"desired_concurrent_number"="1",
"max_error_number"="1000"
)
FROM KAFKA
(
"kafka_broker_list"= "localhost:9092",
"kafka_topic" = "starrocks-load"
);2. Verifikasi bahwa job sedang berjalan
SHOW ROUTINE LOAD FOR load_test.routine_load_wikipedia;Cari State: RUNNING dalam output. Jika statusnya NEED_SCHEDULE, tunggu sebentar lalu jalankan perintah tersebut lagi—job akan berubah menjadi RUNNING sesaat setelah penjadwalan selesai.
Buat job load
Sintaksis
CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
[COLUMNS TERMINATED BY "column_separator" ,]
[COLUMNS (col1, col2, ...) ,]
[WHERE where_condition ,]
[PARTITION (part1, part2, ...)]
[PROPERTIES ("key" = "value", ...)]
FROM KAFKA
[(data_source_properties1 = 'value1',
data_source_properties2 = 'value2',
...)]Jalankan HELP ROUTINE LOAD; untuk melihat referensi sintaksis lengkap.
Parameter
Parameter wajib
| Parameter | Description |
|---|---|
job_name | Nama job load. Harus unik dalam satu database. Biasanya diformat sebagai timestamp ditambah nama tabel. Opsional dengan awalan nama database: database.job_name. |
table_name | Nama tabel tujuan. |
DATA_SOURCE | Jenis sumber data. Atur ke KAFKA. |
Pemetaan dan penyaringan data
| Parameter | Description |
|---|---|
COLUMNS TERMINATED BY | Pemisah kolom dalam data sumber. Default: \t. |
COLUMNS | Memetakan kolom sumber ke kolom tabel tujuan, serta mendefinisikan kolom turunan. Lihat Column mapping di bawah. |
WHERE | Menyaring baris sebelum dimuat. Baris yang disaring oleh kondisi WHERE tidak dihitung sebagai baris error. |
PARTITION | Memuat data ke partisi tertentu dari tabel tujuan. Jika dihilangkan, StarRocks secara otomatis mengarahkan data ke partisi yang sesuai. |
Job Behavior (PROPERTIES)
| Parameter | Default | Description |
|---|---|---|
desired_concurrent_number | 3 | Jumlah maksimum task konkuren yang dapat dibagi oleh job. Harus lebih besar dari 0. |
max_batch_interval | 10 | Interval penjadwalan task dalam detik. Rentang valid: 5–60. Di V1.15 dan versi setelahnya, parameter ini mengatur seberapa sering task dijadwalkan; waktu konsumsi data aktual dikendalikan oleh routine_load_task_consume_second dalam fe.conf (default: 3s), dan timeout task dikendalikan oleh routine_load_task_timeout_second dalam fe.conf (default: 15s). |
max_batch_rows | 200000 | Jumlah maksimum baris yang dapat dibaca setiap task. Harus ≥ 200.000. Di V1.15 dan versi setelahnya, parameter ini hanya digunakan untuk menentukan ukuran jendela deteksi error: 10 × max_batch_rows. |
max_batch_size | 100 MB | Jumlah maksimum byte yang dapat dibaca setiap task. Rentang valid: 100 MB–1 GB. Tidak digunakan lagi di V1.15 dan versi setelahnya — gunakan routine_load_task_consume_second dalam fe.conf sebagai gantinya. |
max_error_number | 0 | Jumlah maksimum baris error yang diperbolehkan dalam jendela sampling. Harus ≥ 0. Nilai default 0 berarti tidak ada baris error yang diperbolehkan. |
strict_mode | Diaktifkan | Saat diaktifkan, baris yang nilai sumber non-null-nya berubah menjadi NULL setelah casting tipe akan diabaikan. Atur ke false untuk menonaktifkan. |
timezone | Zona waktu sesi | Zona waktu yang diterapkan pada semua fungsi yang sensitif terhadap zona waktu selama proses load. |
Properti sumber Kafka
| Property | Required | Description |
|---|---|---|
kafka_broker_list | Ya | Informasi koneksi broker. Format: ip:port. Pisahkan beberapa broker dengan koma. |
kafka_topic | Ya | Topik Kafka yang akan di-subscribe. |
kafka_partitions | Tidak | Partisi tertentu yang akan di-subscribe. |
kafka_offsets | Tidak | Offset awal untuk setiap partisi yang di-subscribe. |
property | Tidak | Properti Kafka tambahan, setara dengan --property dalam Kafka Shell. |
Column mapping
Gunakan klausa COLUMNS untuk memetakan kolom sumber ke kolom tabel tujuan.
Kolom yang dipetakan—lewati atau ubah urutan kolom sumber. Misalnya, jika tabel tujuan memiliki kolom col1, col2, col3, tetapi sumber memiliki empat kolom di mana kolom keempat dipetakan ke col3:
COLUMNS (col2, col1, temp, col3)
-- "temp" adalah placeholder yang menyerap kolom sumber ketiga tanpa memuatnyaKolom turunan—hitung nilai kolom berdasarkan data sumber. Misalnya, untuk mengisi col4 sebagai jumlah dari col1 dan col2:
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)Mengelola job load
Lihat status job
Daftar semua job load dalam suatu database, termasuk yang dihentikan dan dibatalkan:
USE <database>;
SHOW ALL ROUTINE LOAD;Lihat job yang sedang berjalan berdasarkan nama:
SHOW ROUTINE LOAD FOR <database>.<job_name>;StarRocks hanya menampilkan job yang sedang berjalan dengan SHOW ROUTINE LOAD. Job yang telah selesai dan belum dimulai tidak dikembalikan. Gunakan SHOW ALL ROUTINE LOAD untuk menyertakan job yang dihentikan dan dibatalkan.
Jalankan HELP SHOW ROUTINE LOAD; atau HELP SHOW ROUTINE LOAD TASK; untuk opsi lainnya.
Bidang output
Output SHOW ROUTINE LOAD mencakup bidang-bidang utama berikut:
| Field | Description |
|---|---|
State | Status job: RUNNING, PAUSED, NEED_SCHEDULE, atau STOPPED |
Statistic | Statistik kumulatif load sejak pembuatan job |
receivedBytes | Total byte yang diterima |
errorRows | Jumlah baris yang gagal dimuat |
committedTaskNum | Jumlah task yang dikomit oleh FE |
loadedRows | Jumlah baris yang berhasil dimuat |
loadRowsRate | Throughput load dalam baris/detik |
abortedTaskNum | Jumlah task yang dibatalkan di backend |
totalRows | Total baris yang diterima (termasuk baris error dan yang tersaring) |
unselectedRows | Baris yang disaring oleh kondisi WHERE |
receivedBytesRate | Laju penerimaan data dalam byte/detik |
taskExecuteTimeMs | Total waktu eksekusi task dalam milidetik |
ErrorLogUrls | URL untuk mengunduh log error dari proses load |
Progress | Offset konsumen Kafka saat ini per partisi |
Contoh output
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)Hentikan sementara job load
PAUSE ROUTINE LOAD FOR <job_name>;Job masuk ke status PAUSED. Ingestion data berhenti, tetapi job tidak dihentikan—Statistic dan Progress berhenti diperbarui. Lanjutkan job dengan RESUME ROUTINE LOAD saat siap.
Jalankan HELP PAUSE ROUTINE LOAD; untuk contoh.
Lanjutkan job load
RESUME ROUTINE LOAD FOR <job_name>;Job sementara masuk ke status NEED_SCHEDULE saat FE menjadwalkan ulang task. Statusnya kembali menjadi RUNNING sesaat setelah itu, dan Statistic serta Progress mulai diperbarui kembali.
Jalankan HELP RESUME ROUTINE LOAD; untuk contoh.
Hentikan job load
STOP ROUTINE LOAD FOR <job_name>;Job masuk ke status STOPPED. Ingestion dihentikan secara permanen—job yang dihentikan tidak dapat dilanjutkan. Job yang dihentikan tidak muncul dalam output SHOW ROUTINE LOAD.
Jalankan HELP STOP ROUTINE LOAD; untuk contoh.