Routine Load adalah metode impor rutin. StarRocks memungkinkan Anda menggunakan metode ini untuk terus mengimpor data dari Apache Kafka dan mengontrol jeda, melanjutkan, serta menghentikan tugas impor dengan pernyataan SQL. Topik ini menjelaskan prinsip dasar, contoh impor, dan FAQ dari Routine Load.
Istilah
- RoutineLoadJob: Pekerjaan impor rutin yang telah diserahkan.
- JobScheduler: Penjadwal pekerjaan impor rutin yang digunakan untuk menjadwalkan dan membagi sebuah RoutineLoadJob menjadi beberapa tugas.
- Tugas: Tugas yang dibagi dari sebuah RoutineLoadJob oleh JobScheduler berdasarkan aturan tertentu.
- TaskScheduler: Penjadwal tugas yang digunakan untuk menjadwalkan eksekusi sebuah tugas.
Prinsip Dasar

- Anda mengirimkan pekerjaan impor Kafka ke frontend menggunakan klien yang mendukung protokol MySQL.
- Frontend membagi pekerjaan impor menjadi beberapa tugas. Setiap tugas mengimpor bagian data tertentu.
- Setiap tugas ditugaskan ke backend tertentu untuk dieksekusi. Di backend, sebuah tugas dianggap sebagai pekerjaan impor biasa dan mengimpor data berdasarkan mekanisme impor Stream Load.
- Setelah proses impor selesai di backend, backend melaporkan hasil impor ke frontend.
- Frontend terus menghasilkan tugas baru atau mencoba kembali tugas yang gagal berdasarkan hasil impor.
- Frontend terus menghasilkan tugas baru untuk mencapai impor data tanpa gangguan.
Proses Impor
Persyaratan Lingkungan
Anda dapat mengakses kluster Kafka yang tidak memerlukan otentikasi dan kluster Kafka yang memerlukan otentikasi Secure Sockets Layer (SSL).
Sebuah pesan dapat berada dalam salah satu format berikut:
Format CSV. Dalam format ini, setiap pesan menempati satu baris, dan akhir baris tidak mengandung karakter baris baru.
Format JSON.
Tipe Array tidak didukung.
Hanya Kafka versi 0.10.0.0 dan yang lebih baru yang didukung.
Membuat Pekerjaan Impor
Sintaksis
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name> [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]Tabel berikut menjelaskan parameter dalam perintah di atas.
Parameter Diperlukan Deskripsi job_name Ya Nama pekerjaan impor. Nama database impor dapat ditempatkan di depan. Nama tersebut biasanya dalam format timestamp ditambah nama tabel. Nama pekerjaan harus unik dalam sebuah database. table_name Ya Nama tabel tujuan. Klausa COLUMNS TERMINATED Tidak Pemisah kolom dalam file data sumber. Nilai default: \t. Klausa COLUMNS Tidak Pemetaan antara kolom dalam file data sumber dan kolom dalam tabel tujuan. - Kolom yang dipetakan: Misalnya, tabel tujuan memiliki tiga kolom, col1, col2, dan col3, sedangkan file data sumber memiliki empat kolom, dan kolom pertama, kedua, dan keempat dalam tabel tujuan sesuai dengan col2, col1, dan col3 dalam file data sumber. Dalam hal ini, klausa dapat ditulis sebagai
COLUMNS (col2, col1, temp, col3). Kolom temp tidak ada dan digunakan untuk melewati kolom ketiga dalam file data sumber. - Kolom turunan: StarRocks tidak hanya membaca data dalam kolom file data sumber tetapi juga menyediakan operasi pemrosesan pada kolom data. Misalnya, kolom col4 ditambahkan ke tabel tujuan, dan nilai col4 sama dengan nilai col1 ditambah nilai col2. Dalam hal ini, klausa dapat ditulis sebagai
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
Klausa WHERE Tidak Kondisi filter yang ingin Anda gunakan untuk menyaring baris yang tidak Anda butuhkan. Kondisi filter dapat ditentukan pada kolom yang dipetakan atau kolom turunan. Sebagai contoh, jika hanya baris dengan k1 lebih besar dari 100 dan k2 sama dengan 1000 yang diimpor, klausa dapat ditulis sebagai
WHERE k1 > 100 and k2 = 1000.Klausa PARTITION Tidak Partisi tabel tujuan. Jika Anda tidak menentukan partisi, file data sumber secara otomatis diimpor ke partisi yang sesuai. Klausa PROPERTIES Tidak Parameter umum untuk pekerjaan impor. desired_concurrent_number Tidak Jumlah maksimum tugas ke dalam mana pekerjaan impor dapat dibagi. Nilainya harus lebih besar dari 0. Nilai default: 3. max_batch_interval Tidak Waktu eksekusi maksimum setiap tugas. Nilai valid: 5 hingga 60. Unit: detik. Nilai default: 10. Dalam V1.15 dan yang lebih baru, parameter ini menentukan waktu penjadwalan tugas. Anda dapat menentukan seberapa sering tugas dijalankan. routine_load_task_consume_second dalam fe.conf menentukan jumlah waktu yang diperlukan oleh tugas untuk mengonsumsi data. Nilai default: 3s. routine_load_task_timeout_second dalam fe.conf menentukan periode timeout eksekusi tugas. Nilai default: 15s.
max_batch_rows Tidak Jumlah maksimum baris yang dapat dibaca setiap tugas. Nilainya harus lebih besar dari atau sama dengan 200000. Nilai default: 200000. Dalam V1.15 dan yang lebih baru, parameter ini hanya digunakan untuk mendefinisikan rentang jendela deteksi kesalahan. Rentang jendela adalah 10 × max-batch-rows.
max_batch_size Tidak Jumlah maksimum byte yang dapat dibaca setiap tugas. Unit: byte. Nilai valid: 100 MB hingga 1 GB. Nilai default: 100 MB. Dalam V1.15 dan yang lebih baru, parameter ini diabaikan. routine_load_task_consume_second dalam fe.conf menentukan jumlah waktu yang diperlukan oleh tugas untuk mengonsumsi data. Nilai default: 3s.
max_error_number Tidak Jumlah maksimum baris kesalahan yang diizinkan dalam jendela sampling. Nilainya harus lebih besar dari atau sama dengan 0. Nilai default: 0. Tidak ada baris kesalahan yang diizinkan. Penting Baris yang difilter oleh kondisi WHERE bukanlah baris kesalahan.strict_mode Tidak Menentukan apakah akan mengaktifkan mode ketat. Secara default, mode ini diaktifkan. Jika tipe kolom data mentah non-kosong diubah menjadi NULL setelah Anda mengaktifkan mode ketat, data tersebut disaring. Untuk menonaktifkan mode ketat, atur parameter ini ke false.
timezone Tidak Zona waktu yang digunakan untuk pekerjaan impor. Secara default, nilai parameter timezone dari sesi digunakan. Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.
DATA_SOURCE Ya Tipe sumber data. Atur nilainya ke KAFKA. data_source_properties Tidak Informasi tentang sumber data. Nilainya mencakup bidang berikut: - kafka_broker_list: informasi koneksi tentang broker Kafka. Format:
ip:host. Pisahkan beberapa broker dengan koma (,). - kafka_topic: topik Kafka yang ingin Anda langgani. Catatan Bidang kafka_broker_list dan kafka_topic wajib.
- kafka_partitions dan kafka_offsets: partisi Kafka yang ingin Anda langgani dan offset awal setiap partisi.
- property: properti terkait Kafka. Bidang ini setara dengan parameter
--propertydalam Kafka Shell. Anda dapat menjalankan perintahHELP ROUTINE LOAD;untuk melihat sintaksis lebih rinci untuk membuat pekerjaan impor.
- Kolom yang dipetakan: Misalnya, tabel tujuan memiliki tiga kolom, col1, col2, dan col3, sedangkan file data sumber memiliki empat kolom, dan kolom pertama, kedua, dan keempat dalam tabel tujuan sesuai dengan col2, col1, dan col3 dalam file data sumber. Dalam hal ini, klausa dapat ditulis sebagai
Contoh: Kirim pekerjaan impor Routine Load tanpa otentikasi bernama example_tbl2_ordertest ke StarRocks untuk terus mengonsumsi pesan dari topik ordertest2 kluster Kafka dan mengimpor pesan ke tabel example_tbl2. Pekerjaan impor mengonsumsi pesan dari offset paling awal partisi tertentu dari topik.
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="5", "format" ="json", "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]" ) FROM KAFKA ( "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>", "kafka_topic" = "ordertest2", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Contoh: Gunakan protokol SSL untuk mengakses Kafka. Contoh kode:
-- Tentukan SSL sebagai protokol keamanan. "property.security.protocol" = "ssl", -- Tentukan lokasi penyimpanan sertifikat CA. "property.ssl.ca.location" = "FILE:ca-cert", -- Jika autentikasi klien diaktifkan untuk server Kafka, Anda harus mengonfigurasi parameter berikut: -- Tentukan lokasi kunci publik untuk klien Kafka. "property.ssl.certificate.location" = "FILE:client.pem", -- Tentukan lokasi kunci privat untuk klien Kafka. "property.ssl.key.location" = "FILE:client.key", -- Tentukan kata sandi kunci privat untuk klien Kafka. "property.ssl.key.password" = "******"Untuk informasi tentang pernyataan CREATE FILE, lihat CREATE FILE.
CatatanSaat menggunakan pernyataan CREATE FILE, tentukan alamat HTTP Object Storage Service (OSS) sebagai
URL. Untuk informasi lebih lanjut, lihat Gunakan titik akhir yang mendukung IPv6 untuk mengakses OSS.
Lihat status pekerjaan impor
Kueri semua pekerjaan impor rutin dalam database load_test, termasuk pekerjaan yang dihentikan atau dibatalkan. Pekerjaan ditampilkan dalam satu atau lebih baris.
USE load_test; SHOW ALL ROUTINE LOAD;Kueri pekerjaan impor rutin yang sedang berjalan bernama example_tbl2_ordertest dalam load_test.
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;Lakukan operasi berikut untuk melihat status pekerjaan impor pada tab Kafka Import: Buka halaman EMR StarRocks Manager. Di panel navigasi di sebelah kiri, klik Metadata Management. Klik nama database yang diinginkan. Pada halaman yang muncul, klik tab Tasks.
StarRocks memungkinkan Anda melihat hanya pekerjaan yang sedang berjalan. Anda tidak dapat melihat pekerjaan yang selesai dan pekerjaan yang belum dijalankan.
Anda dapat menjalankan perintah SHOW ALL ROUTINE LOAD untuk melihat semua pekerjaan Routine Load yang sedang berjalan. Keluaran berikut dikembalikan:
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)Dalam contoh ini, pekerjaan impor bernama routine_load_wikipedia dibuat. Tabel berikut menjelaskan parameter.
Parameter | Deskripsi |
State | Status pekerjaan impor. Nilai RUNNING menunjukkan bahwa pekerjaan impor terus berjalan. |
Statistic | Informasi kemajuan, yang mencakup informasi impor sejak pekerjaan dibuat. |
receivedBytes | Ukuran data yang diterima. Unit: byte. |
errorRows | Jumlah baris kesalahan yang diimpor. |
committedTaskNum | Jumlah tugas yang dikirimkan oleh node frontend. |
loadedRows | Jumlah baris yang diimpor. |
loadRowsRate | Laju impor data. Unit: baris/detik. |
abortedTaskNum | Jumlah tugas yang gagal pada node backend. |
totalRows | Jumlah total baris yang diterima. |
unselectedRows | Jumlah baris yang difilter oleh kondisi WHERE. |
receivedBytesRate | Laju penerimaan data. Unit: byte/detik. |
taskExecuteTimeMs | Durasi impor. Unit: milidetik. |
ErrorLogUrls | Log pesan kesalahan. Anda dapat menggunakan URL untuk melihat pesan kesalahan selama proses impor. |
Jeda pekerjaan impor
Eksekusi pernyataan PAUSE untuk menjeda pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan impor masuk ke dalam status PAUSED. Impor data dijeda, tetapi pekerjaan tidak dihentikan. Anda dapat mengeksekusi pernyataan RESUME untuk melanjutkan pekerjaan tersebut.
PAUSE ROUTINE LOAD FOR <job_name>;Setelah pekerjaan impor dijeda, informasi impor dalam Statistic dan Progress berhenti diperbarui. Anda dapat mengeksekusi pernyataan SHOW ROUTINE LOAD untuk melihat pekerjaan impor yang dijeda.
Melanjutkan pekerjaan impor
Eksekusi pernyataan RESUME untuk melanjutkan pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan sementara memasuki status NEED_SCHEDULE, yang menunjukkan bahwa pekerjaan sedang dijadwalkan ulang. Setelah periode waktu tertentu, pekerjaan masuk ke status RUNNING dan impor data dilanjutkan.
RESUME ROUTINE LOAD FOR <job_name>;Menghentikan pekerjaan impor
Eksekusi pernyataan STOP untuk menghentikan pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan impor masuk ke dalam status STOPPED. Impor data dihentikan, dan pekerjaan diakhiri. Dalam hal ini, impor data tidak dapat dilanjutkan.
STOP ROUTINE LOAD FOR <job_name>;Setelah pekerjaan impor dihentikan, informasi impor dalam Statistic dan Progress tidak lagi diperbarui. Dalam hal ini, Anda tidak dapat mengeksekusi pernyataan SHOW ROUTINE LOAD untuk melihat pekerjaan impor yang telah dihentikan.
Praktik Terbaik
Dalam contoh ini, sebuah pekerjaan impor Routine Load dibuat untuk terus mengonsumsi data dalam format CSV dari kluster Kafka dan kemudian mengimpor data CSV tersebut ke StarRocks.
Lakukan operasi berikut dalam kluster Kafka:
Buat topik uji.
kafka-topics.sh --create --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"Jalankan perintah berikut untuk memproduksi data.
kafka-console-producer.sh --broker-list core-1-1:9092 --topic order_sr_topicMasukkan data berikut:
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924
Lakukan operasi berikut dalam kluster StarRocks:
Jalankan perintah berikut untuk membuat database tujuan dan tabel.
Buat tabel bernama routine_load_tbl_csv dalam database load_test kluster StarRocks berdasarkan kolom yang ingin Anda impor. Dalam contoh ini, semua kolom, kecuali kolom kelima, diimpor. Kolom kelima menampilkan informasi gender.
CREATE TABLE load_test.routine_load_tbl_csv ( `order_id` bigint NOT NULL COMMENT "Order ID", 'pay_dt' date NOT NULL COMMENT "Tanggal pembelian", 'customer_name' varchar(26) NULL COMMENT "Nama pelanggan", 'nationality' varchar(26) NULL COMMENT "Negara", 'price' double NULL COMMENT "Jumlah pembayaran" ) ENGINE=OLAP PRIMARY KEY (order_id,pay_dt) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;Eksekusi pernyataan berikut untuk membuat pekerjaan impor:
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv COLUMNS TERMINATED BY ",", COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price) PROPERTIES ( "desired_concurrent_number" = "5" ) FROM KAFKA ( "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092", "kafka_topic" = "order_sr_topic", "kafka_partitions" ="0,1,2,3,4", "property.kafka_default_offsets" = "OFFSET_BEGINNING" )Eksekusi pernyataan berikut untuk melihat informasi tentang pekerjaan impor routine_load_tbl_ordertest_csv:
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;Jika statusnya adalah RUNNING, pekerjaan berjalan sesuai harapan.
Jalankan perintah berikut untuk menanyakan tabel tujuan. Hasilnya menunjukkan bahwa sinkronisasi data selesai.
Anda juga dapat melakukan operasi berikut pada pekerjaan:
Jeda pekerjaan impor
PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;Lanjutkan pekerjaan impor
RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;Ubah pekerjaan impor
CatatanHanya pekerjaan dalam status PAUSED yang dapat diubah.
Sebagai contoh, Anda dapat mengubah nilai desired_concurrent_number menjadi 6.
ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv PROPERTIES ( "desired_concurrent_number" = "6" )Hentikan pekerjaan impor
STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;