All Products
Search
Document Center

E-MapReduce:Routine Load

Last Updated:Mar 27, 2026

Routine Load secara terus-menerus mengonsumsi data dari Apache Kafka ke StarRocks pada EMR. Setelah job load berjalan, StarRocks secara otomatis melakukan polling terhadap topik Kafka—Anda mengontrol siklus hidup job tersebut menggunakan Pernyataan SQL (pause, resume, atau stop).

Gunakan Routine Load jika Anda memerlukan pipeline ingestion yang persisten dan selalu aktif dari Kafka. Untuk pemuatan massal satu kali, gunakan Stream Load atau Broker Load.

Cara kerja

Routine Load

Routine Load menggunakan model penjadwalan dua tingkat:

  1. Kirim job load ke frontend (FE) melalui klien apa pun yang kompatibel dengan MySQL.

  2. JobScheduler membagi job tersebut menjadi beberapa task. Setiap task mencakup subset partisi Kafka tertentu.

  3. TaskScheduler menetapkan setiap task ke backend (BE). BE memperlakukan task tersebut sebagai job Stream Load dan menulis data ke StarRocks.

  4. Setelah task selesai, BE melaporkan hasilnya ke FE.

  5. FE membuat task baru untuk batch berikutnya atau mencoba ulang task yang gagal.

  6. Siklus ini berulang terus-menerus, menjaga aliran data tanpa gangguan.

Konsep utama

TermDescription
RoutineLoadJobJob load yang dikirimkan ke FE
JobSchedulerMembagi RoutineLoadJob menjadi task berdasarkan aturan yang dikonfigurasi
TaskSatu unit pekerjaan yang berasal dari RoutineLoadJob
TaskSchedulerMenjadwalkan eksekusi task di seluruh node backend
Gambar dan beberapa informasi dalam topik ini berasal dari Continuously load data from Apache Kafka dalam dokumentasi open-source StarRocks.

Prasyarat

Sebelum memulai, pastikan:

  • Versi Kafka adalah V0.10.0.0 atau lebih baru.

  • Kluster Kafka tidak menggunakan autentikasi atau menggunakan autentikasi SSL (jenis autentikasi lain tidak didukung).

  • Pesan dalam format CSV atau JSON—tipe Array tidak didukung.

  • Untuk CSV: setiap pesan berupa satu baris; baris tersebut tidak boleh diakhiri dengan karakter line feed.

Memulai cepat

Contoh ini menunjukkan cara membuat job Routine Load yang membaca dari kluster Kafka lokal.

1. Create a load job

CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
PROPERTIES
(
    "desired_concurrent_number"="1",
    "max_error_number"="1000"
)
FROM KAFKA
(
    "kafka_broker_list"= "localhost:9092",
    "kafka_topic" = "starrocks-load"
);

2. Verifikasi bahwa job sedang berjalan

SHOW ROUTINE LOAD FOR load_test.routine_load_wikipedia;

Cari State: RUNNING dalam output. Jika statusnya NEED_SCHEDULE, tunggu sebentar lalu jalankan perintah tersebut lagi—job akan berubah menjadi RUNNING sesaat setelah penjadwalan selesai.

Buat job load

Sintaksis

CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
    [COLUMNS TERMINATED BY "column_separator" ,]
    [COLUMNS (col1, col2, ...) ,]
    [WHERE where_condition ,]
    [PARTITION (part1, part2, ...)]
    [PROPERTIES ("key" = "value", ...)]
    FROM KAFKA
    [(data_source_properties1 = 'value1',
    data_source_properties2 = 'value2',
    ...)]

Jalankan HELP ROUTINE LOAD; untuk melihat referensi sintaksis lengkap.

Parameter

Parameter wajib

ParameterDescription
job_nameNama job load. Harus unik dalam satu database. Biasanya diformat sebagai timestamp ditambah nama tabel. Opsional dengan awalan nama database: database.job_name.
table_nameNama tabel tujuan.
DATA_SOURCEJenis sumber data. Atur ke KAFKA.

Pemetaan dan penyaringan data

ParameterDescription
COLUMNS TERMINATED BYPemisah kolom dalam data sumber. Default: \t.
COLUMNSMemetakan kolom sumber ke kolom tabel tujuan, serta mendefinisikan kolom turunan. Lihat Column mapping di bawah.
WHEREMenyaring baris sebelum dimuat. Baris yang disaring oleh kondisi WHERE tidak dihitung sebagai baris error.
PARTITIONMemuat data ke partisi tertentu dari tabel tujuan. Jika dihilangkan, StarRocks secara otomatis mengarahkan data ke partisi yang sesuai.

Job Behavior (PROPERTIES)

ParameterDefaultDescription
desired_concurrent_number3Jumlah maksimum task konkuren yang dapat dibagi oleh job. Harus lebih besar dari 0.
max_batch_interval10Interval penjadwalan task dalam detik. Rentang valid: 5–60. Di V1.15 dan versi setelahnya, parameter ini mengatur seberapa sering task dijadwalkan; waktu konsumsi data aktual dikendalikan oleh routine_load_task_consume_second dalam fe.conf (default: 3s), dan timeout task dikendalikan oleh routine_load_task_timeout_second dalam fe.conf (default: 15s).
max_batch_rows200000Jumlah maksimum baris yang dapat dibaca setiap task. Harus ≥ 200.000. Di V1.15 dan versi setelahnya, parameter ini hanya digunakan untuk menentukan ukuran jendela deteksi error: 10 × max_batch_rows.
max_batch_size100 MBJumlah maksimum byte yang dapat dibaca setiap task. Rentang valid: 100 MB–1 GB. Tidak digunakan lagi di V1.15 dan versi setelahnya — gunakan routine_load_task_consume_second dalam fe.conf sebagai gantinya.
max_error_number0Jumlah maksimum baris error yang diperbolehkan dalam jendela sampling. Harus ≥ 0. Nilai default 0 berarti tidak ada baris error yang diperbolehkan.
strict_modeDiaktifkanSaat diaktifkan, baris yang nilai sumber non-null-nya berubah menjadi NULL setelah casting tipe akan diabaikan. Atur ke false untuk menonaktifkan.
timezoneZona waktu sesiZona waktu yang diterapkan pada semua fungsi yang sensitif terhadap zona waktu selama proses load.

Properti sumber Kafka

PropertyRequiredDescription
kafka_broker_listYaInformasi koneksi broker. Format: ip:port. Pisahkan beberapa broker dengan koma.
kafka_topicYaTopik Kafka yang akan di-subscribe.
kafka_partitionsTidakPartisi tertentu yang akan di-subscribe.
kafka_offsetsTidakOffset awal untuk setiap partisi yang di-subscribe.
propertyTidakProperti Kafka tambahan, setara dengan --property dalam Kafka Shell.

Column mapping

Gunakan klausa COLUMNS untuk memetakan kolom sumber ke kolom tabel tujuan.

Kolom yang dipetakan—lewati atau ubah urutan kolom sumber. Misalnya, jika tabel tujuan memiliki kolom col1, col2, col3, tetapi sumber memiliki empat kolom di mana kolom keempat dipetakan ke col3:

COLUMNS (col2, col1, temp, col3)
-- "temp" adalah placeholder yang menyerap kolom sumber ketiga tanpa memuatnya

Kolom turunan—hitung nilai kolom berdasarkan data sumber. Misalnya, untuk mengisi col4 sebagai jumlah dari col1 dan col2:

COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)

Mengelola job load

Lihat status job

Daftar semua job load dalam suatu database, termasuk yang dihentikan dan dibatalkan:

USE <database>;
SHOW ALL ROUTINE LOAD;

Lihat job yang sedang berjalan berdasarkan nama:

SHOW ROUTINE LOAD FOR <database>.<job_name>;
Penting

StarRocks hanya menampilkan job yang sedang berjalan dengan SHOW ROUTINE LOAD. Job yang telah selesai dan belum dimulai tidak dikembalikan. Gunakan SHOW ALL ROUTINE LOAD untuk menyertakan job yang dihentikan dan dibatalkan.

Jalankan HELP SHOW ROUTINE LOAD; atau HELP SHOW ROUTINE LOAD TASK; untuk opsi lainnya.

Bidang output

Output SHOW ROUTINE LOAD mencakup bidang-bidang utama berikut:

FieldDescription
StateStatus job: RUNNING, PAUSED, NEED_SCHEDULE, atau STOPPED
StatisticStatistik kumulatif load sejak pembuatan job
receivedBytesTotal byte yang diterima
errorRowsJumlah baris yang gagal dimuat
committedTaskNumJumlah task yang dikomit oleh FE
loadedRowsJumlah baris yang berhasil dimuat
loadRowsRateThroughput load dalam baris/detik
abortedTaskNumJumlah task yang dibatalkan di backend
totalRowsTotal baris yang diterima (termasuk baris error dan yang tersaring)
unselectedRowsBaris yang disaring oleh kondisi WHERE
receivedBytesRateLaju penerimaan data dalam byte/detik
taskExecuteTimeMsTotal waktu eksekusi task dalam milidetik
ErrorLogUrlsURL untuk mengunduh log error dari proses load
ProgressOffset konsumen Kafka saat ini per partisi

Contoh output

*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)

Hentikan sementara job load

PAUSE ROUTINE LOAD FOR <job_name>;

Job masuk ke status PAUSED. Ingestion data berhenti, tetapi job tidak dihentikan—Statistic dan Progress berhenti diperbarui. Lanjutkan job dengan RESUME ROUTINE LOAD saat siap.

Jalankan HELP PAUSE ROUTINE LOAD; untuk contoh.

Lanjutkan job load

RESUME ROUTINE LOAD FOR <job_name>;

Job sementara masuk ke status NEED_SCHEDULE saat FE menjadwalkan ulang task. Statusnya kembali menjadi RUNNING sesaat setelah itu, dan Statistic serta Progress mulai diperbarui kembali.

Jalankan HELP RESUME ROUTINE LOAD; untuk contoh.

Hentikan job load

STOP ROUTINE LOAD FOR <job_name>;

Job masuk ke status STOPPED. Ingestion dihentikan secara permanen—job yang dihentikan tidak dapat dilanjutkan. Job yang dihentikan tidak muncul dalam output SHOW ROUTINE LOAD.

Jalankan HELP STOP ROUTINE LOAD; untuk contoh.