全部产品
Search
文档中心

E-MapReduce:Routine Load

更新时间:Jul 02, 2025

Routine Load adalah metode impor rutin. StarRocks memungkinkan Anda menggunakan metode ini untuk terus mengimpor data dari Apache Kafka dan mengontrol jeda, melanjutkan, serta menghentikan tugas impor dengan pernyataan SQL. Topik ini menjelaskan prinsip dasar, contoh impor, dan FAQ dari Routine Load.

Istilah

  • RoutineLoadJob: Pekerjaan impor rutin yang telah diserahkan.
  • JobScheduler: Penjadwal pekerjaan impor rutin yang digunakan untuk menjadwalkan dan membagi sebuah RoutineLoadJob menjadi beberapa tugas.
  • Tugas: Tugas yang dibagi dari sebuah RoutineLoadJob oleh JobScheduler berdasarkan aturan tertentu.
  • TaskScheduler: Penjadwal tugas yang digunakan untuk menjadwalkan eksekusi sebuah tugas.

Prinsip Dasar

Gambar berikut menunjukkan proses impor dari Routine Load. Routine Load
Langkah-langkah berikut menjelaskan cara mengimpor data menggunakan Routine Load.
  1. Anda mengirimkan pekerjaan impor Kafka ke frontend menggunakan klien yang mendukung protokol MySQL.
  2. Frontend membagi pekerjaan impor menjadi beberapa tugas. Setiap tugas mengimpor bagian data tertentu.
  3. Setiap tugas ditugaskan ke backend tertentu untuk dieksekusi. Di backend, sebuah tugas dianggap sebagai pekerjaan impor biasa dan mengimpor data berdasarkan mekanisme impor Stream Load.
  4. Setelah proses impor selesai di backend, backend melaporkan hasil impor ke frontend.
  5. Frontend terus menghasilkan tugas baru atau mencoba kembali tugas yang gagal berdasarkan hasil impor.
  6. Frontend terus menghasilkan tugas baru untuk mencapai impor data tanpa gangguan.
Catatan Gambar dan beberapa informasi dalam topik ini berasal dari Continuously load data from Apache Kafka dari StarRocks open source.

Proses Impor

Persyaratan Lingkungan

  • Anda dapat mengakses kluster Kafka yang tidak memerlukan otentikasi dan kluster Kafka yang memerlukan otentikasi Secure Sockets Layer (SSL).

  • Sebuah pesan dapat berada dalam salah satu format berikut:

    • Format CSV. Dalam format ini, setiap pesan menempati satu baris, dan akhir baris tidak mengandung karakter baris baru.

    • Format JSON.

  • Tipe Array tidak didukung.

  • Hanya Kafka versi 0.10.0.0 dan yang lebih baru yang didukung.

Membuat Pekerjaan Impor

  • Sintaksis

    CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]

    Tabel berikut menjelaskan parameter dalam perintah di atas.

    ParameterDiperlukanDeskripsi
    job_nameYaNama pekerjaan impor. Nama database impor dapat ditempatkan di depan. Nama tersebut biasanya dalam format timestamp ditambah nama tabel. Nama pekerjaan harus unik dalam sebuah database.
    table_nameYaNama tabel tujuan.
    Klausa COLUMNS TERMINATEDTidakPemisah kolom dalam file data sumber. Nilai default: \t.
    Klausa COLUMNSTidakPemetaan antara kolom dalam file data sumber dan kolom dalam tabel tujuan.
    • Kolom yang dipetakan: Misalnya, tabel tujuan memiliki tiga kolom, col1, col2, dan col3, sedangkan file data sumber memiliki empat kolom, dan kolom pertama, kedua, dan keempat dalam tabel tujuan sesuai dengan col2, col1, dan col3 dalam file data sumber. Dalam hal ini, klausa dapat ditulis sebagai COLUMNS (col2, col1, temp, col3). Kolom temp tidak ada dan digunakan untuk melewati kolom ketiga dalam file data sumber.
    • Kolom turunan: StarRocks tidak hanya membaca data dalam kolom file data sumber tetapi juga menyediakan operasi pemrosesan pada kolom data. Misalnya, kolom col4 ditambahkan ke tabel tujuan, dan nilai col4 sama dengan nilai col1 ditambah nilai col2. Dalam hal ini, klausa dapat ditulis sebagai COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
    Klausa WHERETidakKondisi filter yang ingin Anda gunakan untuk menyaring baris yang tidak Anda butuhkan. Kondisi filter dapat ditentukan pada kolom yang dipetakan atau kolom turunan.

    Sebagai contoh, jika hanya baris dengan k1 lebih besar dari 100 dan k2 sama dengan 1000 yang diimpor, klausa dapat ditulis sebagai WHERE k1 > 100 and k2 = 1000.

    Klausa PARTITIONTidakPartisi tabel tujuan. Jika Anda tidak menentukan partisi, file data sumber secara otomatis diimpor ke partisi yang sesuai.
    Klausa PROPERTIESTidakParameter umum untuk pekerjaan impor.
    desired_concurrent_numberTidakJumlah maksimum tugas ke dalam mana pekerjaan impor dapat dibagi. Nilainya harus lebih besar dari 0. Nilai default: 3.
    max_batch_intervalTidakWaktu eksekusi maksimum setiap tugas. Nilai valid: 5 hingga 60. Unit: detik. Nilai default: 10.

    Dalam V1.15 dan yang lebih baru, parameter ini menentukan waktu penjadwalan tugas. Anda dapat menentukan seberapa sering tugas dijalankan. routine_load_task_consume_second dalam fe.conf menentukan jumlah waktu yang diperlukan oleh tugas untuk mengonsumsi data. Nilai default: 3s. routine_load_task_timeout_second dalam fe.conf menentukan periode timeout eksekusi tugas. Nilai default: 15s.

    max_batch_rowsTidakJumlah maksimum baris yang dapat dibaca setiap tugas. Nilainya harus lebih besar dari atau sama dengan 200000. Nilai default: 200000.

    Dalam V1.15 dan yang lebih baru, parameter ini hanya digunakan untuk mendefinisikan rentang jendela deteksi kesalahan. Rentang jendela adalah 10 × max-batch-rows.

    max_batch_sizeTidakJumlah maksimum byte yang dapat dibaca setiap tugas. Unit: byte. Nilai valid: 100 MB hingga 1 GB. Nilai default: 100 MB.

    Dalam V1.15 dan yang lebih baru, parameter ini diabaikan. routine_load_task_consume_second dalam fe.conf menentukan jumlah waktu yang diperlukan oleh tugas untuk mengonsumsi data. Nilai default: 3s.

    max_error_numberTidakJumlah maksimum baris kesalahan yang diizinkan dalam jendela sampling. Nilainya harus lebih besar dari atau sama dengan 0. Nilai default: 0. Tidak ada baris kesalahan yang diizinkan.
    Penting Baris yang difilter oleh kondisi WHERE bukanlah baris kesalahan.
    strict_modeTidakMenentukan apakah akan mengaktifkan mode ketat. Secara default, mode ini diaktifkan.

    Jika tipe kolom data mentah non-kosong diubah menjadi NULL setelah Anda mengaktifkan mode ketat, data tersebut disaring. Untuk menonaktifkan mode ketat, atur parameter ini ke false.

    timezoneTidakZona waktu yang digunakan untuk pekerjaan impor.

    Secara default, nilai parameter timezone dari sesi digunakan. Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.

    DATA_SOURCEYaTipe sumber data. Atur nilainya ke KAFKA.
    data_source_propertiesTidakInformasi tentang sumber data. Nilainya mencakup bidang berikut:
    • kafka_broker_list: informasi koneksi tentang broker Kafka. Format: ip:host. Pisahkan beberapa broker dengan koma (,).
    • kafka_topic: topik Kafka yang ingin Anda langgani.
      Catatan Bidang kafka_broker_list dan kafka_topic wajib.
    • kafka_partitions dan kafka_offsets: partisi Kafka yang ingin Anda langgani dan offset awal setiap partisi.
    • property: properti terkait Kafka. Bidang ini setara dengan parameter --property dalam Kafka Shell. Anda dapat menjalankan perintah HELP ROUTINE LOAD; untuk melihat sintaksis lebih rinci untuk membuat pekerjaan impor.
  • Contoh: Kirim pekerjaan impor Routine Load tanpa otentikasi bernama example_tbl2_ordertest ke StarRocks untuk terus mengonsumsi pesan dari topik ordertest2 kluster Kafka dan mengimpor pesan ke tabel example_tbl2. Pekerjaan impor mengonsumsi pesan dari offset paling awal partisi tertentu dari topik.

    CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
    COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="5",
        "format" ="json",
        "jsonpaths" ="[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
     )
    FROM KAFKA
    (
        "kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
        "kafka_topic" = "ordertest2",
        "kafka_partitions" ="0,1,2,3,4",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
  • Contoh: Gunakan protokol SSL untuk mengakses Kafka. Contoh kode:

    -- Tentukan SSL sebagai protokol keamanan. 
    "property.security.protocol" = "ssl", 
    
     -- Tentukan lokasi penyimpanan sertifikat CA. 
    "property.ssl.ca.location" = "FILE:ca-cert",
    
    -- Jika autentikasi klien diaktifkan untuk server Kafka, Anda harus mengonfigurasi parameter berikut:
    -- Tentukan lokasi kunci publik untuk klien Kafka. 
    "property.ssl.certificate.location" = "FILE:client.pem", 
    -- Tentukan lokasi kunci privat untuk klien Kafka. 
    "property.ssl.key.location" = "FILE:client.key", 
    -- Tentukan kata sandi kunci privat untuk klien Kafka. 
    "property.ssl.key.password" = "******"

    Untuk informasi tentang pernyataan CREATE FILE, lihat CREATE FILE.

    Catatan

    Saat menggunakan pernyataan CREATE FILE, tentukan alamat HTTP Object Storage Service (OSS) sebagai URL. Untuk informasi lebih lanjut, lihat Gunakan titik akhir yang mendukung IPv6 untuk mengakses OSS.

Lihat status pekerjaan impor

  • Kueri semua pekerjaan impor rutin dalam database load_test, termasuk pekerjaan yang dihentikan atau dibatalkan. Pekerjaan ditampilkan dalam satu atau lebih baris.

    USE load_test;
    SHOW ALL ROUTINE LOAD;
  • Kueri pekerjaan impor rutin yang sedang berjalan bernama example_tbl2_ordertest dalam load_test.

    SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
  • Lakukan operasi berikut untuk melihat status pekerjaan impor pada tab Kafka Import: Buka halaman EMR StarRocks Manager. Di panel navigasi di sebelah kiri, klik Metadata Management. Klik nama database yang diinginkan. Pada halaman yang muncul, klik tab Tasks.

Penting

StarRocks memungkinkan Anda melihat hanya pekerjaan yang sedang berjalan. Anda tidak dapat melihat pekerjaan yang selesai dan pekerjaan yang belum dijalankan.

Anda dapat menjalankan perintah SHOW ALL ROUTINE LOAD untuk melihat semua pekerjaan Routine Load yang sedang berjalan. Keluaran berikut dikembalikan:

*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)

Dalam contoh ini, pekerjaan impor bernama routine_load_wikipedia dibuat. Tabel berikut menjelaskan parameter.

Parameter

Deskripsi

State

Status pekerjaan impor. Nilai RUNNING menunjukkan bahwa pekerjaan impor terus berjalan.

Statistic

Informasi kemajuan, yang mencakup informasi impor sejak pekerjaan dibuat.

receivedBytes

Ukuran data yang diterima. Unit: byte.

errorRows

Jumlah baris kesalahan yang diimpor.

committedTaskNum

Jumlah tugas yang dikirimkan oleh node frontend.

loadedRows

Jumlah baris yang diimpor.

loadRowsRate

Laju impor data. Unit: baris/detik.

abortedTaskNum

Jumlah tugas yang gagal pada node backend.

totalRows

Jumlah total baris yang diterima.

unselectedRows

Jumlah baris yang difilter oleh kondisi WHERE.

receivedBytesRate

Laju penerimaan data. Unit: byte/detik.

taskExecuteTimeMs

Durasi impor. Unit: milidetik.

ErrorLogUrls

Log pesan kesalahan. Anda dapat menggunakan URL untuk melihat pesan kesalahan selama proses impor.

Jeda pekerjaan impor

Eksekusi pernyataan PAUSE untuk menjeda pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan impor masuk ke dalam status PAUSED. Impor data dijeda, tetapi pekerjaan tidak dihentikan. Anda dapat mengeksekusi pernyataan RESUME untuk melanjutkan pekerjaan tersebut.

PAUSE ROUTINE LOAD FOR <job_name>;

Setelah pekerjaan impor dijeda, informasi impor dalam Statistic dan Progress berhenti diperbarui. Anda dapat mengeksekusi pernyataan SHOW ROUTINE LOAD untuk melihat pekerjaan impor yang dijeda.

Melanjutkan pekerjaan impor

Eksekusi pernyataan RESUME untuk melanjutkan pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan sementara memasuki status NEED_SCHEDULE, yang menunjukkan bahwa pekerjaan sedang dijadwalkan ulang. Setelah periode waktu tertentu, pekerjaan masuk ke status RUNNING dan impor data dilanjutkan.

RESUME ROUTINE LOAD FOR <job_name>;

Menghentikan pekerjaan impor

Eksekusi pernyataan STOP untuk menghentikan pekerjaan impor. Setelah Anda mengeksekusi pernyataan, pekerjaan impor masuk ke dalam status STOPPED. Impor data dihentikan, dan pekerjaan diakhiri. Dalam hal ini, impor data tidak dapat dilanjutkan.

STOP ROUTINE LOAD FOR <job_name>;

Setelah pekerjaan impor dihentikan, informasi impor dalam Statistic dan Progress tidak lagi diperbarui. Dalam hal ini, Anda tidak dapat mengeksekusi pernyataan SHOW ROUTINE LOAD untuk melihat pekerjaan impor yang telah dihentikan.stop

Praktik Terbaik

Dalam contoh ini, sebuah pekerjaan impor Routine Load dibuat untuk terus mengonsumsi data dalam format CSV dari kluster Kafka dan kemudian mengimpor data CSV tersebut ke StarRocks.

  1. Lakukan operasi berikut dalam kluster Kafka:

    1. Buat topik uji.

      kafka-topics.sh --create  --topic order_sr_topic --replication-factor 3 --partitions 10 --bootstrap-server "core-1-1:9092,core-1-2:9092,core-1-3:9092"
    2. Jalankan perintah berikut untuk memproduksi data.

      kafka-console-producer.sh  --broker-list core-1-1:9092 --topic order_sr_topic
    3. Masukkan data berikut:

      2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
      2020050802,2020-05-08,Julien Sorel,France,male,893
      2020050803,2020-05-08,Dorian Grey,UK,male,1262
      2020051001,2020-05-10,Tess Durbeyfield,US,female,986
      2020051101,2020-05-11,Edogawa Conan,japan,male,8924
  2. Lakukan operasi berikut dalam kluster StarRocks:

    1. Jalankan perintah berikut untuk membuat database tujuan dan tabel.

      Buat tabel bernama routine_load_tbl_csv dalam database load_test kluster StarRocks berdasarkan kolom yang ingin Anda impor. Dalam contoh ini, semua kolom, kecuali kolom kelima, diimpor. Kolom kelima menampilkan informasi gender.

      CREATE TABLE load_test.routine_load_tbl_csv (
          `order_id` bigint NOT NULL COMMENT "Order ID",
          'pay_dt' date NOT NULL COMMENT "Tanggal pembelian",
          'customer_name' varchar(26) NULL COMMENT "Nama pelanggan",
          'nationality' varchar(26) NULL COMMENT "Negara",
          'price' double NULL COMMENT "Jumlah pembayaran"
      )
      ENGINE=OLAP
      PRIMARY KEY (order_id,pay_dt)
      DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
    2. Eksekusi pernyataan berikut untuk membuat pekerjaan impor:

      CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
      COLUMNS TERMINATED BY ",",
      COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
      PROPERTIES
      (
          "desired_concurrent_number" = "5"
      )
      FROM KAFKA
      (
          "kafka_broker_list" ="192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
          "kafka_topic" = "order_sr_topic",
          "kafka_partitions" ="0,1,2,3,4",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      )
    3. Eksekusi pernyataan berikut untuk melihat informasi tentang pekerjaan impor routine_load_tbl_ordertest_csv:

      SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;

      Jika statusnya adalah RUNNING, pekerjaan berjalan sesuai harapan.

    4. Jalankan perintah berikut untuk menanyakan tabel tujuan. Hasilnya menunjukkan bahwa sinkronisasi data selesai.

      Anda juga dapat melakukan operasi berikut pada pekerjaan:

      • Jeda pekerjaan impor

        PAUSE ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • Lanjutkan pekerjaan impor

        RESUME ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;
      • Ubah pekerjaan impor

        Catatan

        Hanya pekerjaan dalam status PAUSED yang dapat diubah.

        Sebagai contoh, Anda dapat mengubah nilai desired_concurrent_number menjadi 6.

        ALTER ROUTINE LOAD FOR routine_load_tbl_ordertest_csv
        PROPERTIES
        (
            "desired_concurrent_number" = "6"
        )
      • Hentikan pekerjaan impor

        STOP ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;