全部产品
Search
文档中心

E-MapReduce:Stream Load

更新时间:Nov 10, 2025

Anda dapat menggunakan Stream Load untuk mengimpor file lokal atau aliran data ke StarRocks. Topik ini menjelaskan proses pengimporan data menggunakan Stream Load.

Informasi latar belakang

Stream Load adalah metode impor sinkron yang memungkinkan Anda mengimpor file lokal atau aliran data ke StarRocks melalui permintaan HTTP. Tanggapan HTTP akan mengembalikan hasil impor, yang menunjukkan apakah tugas berhasil atau tidak. Stream Load mendukung format file CSV dan JSON, dengan ukuran data maksimum 10 GB per impor.

Buat tugas impor

Stream Load mengirimkan dan mentransfer data menggunakan protokol HTTP. Topik ini menggunakan perintah curl untuk menunjukkan cara mengirimkan tugas impor. Anda juga dapat menggunakan klien HTTP lainnya.

Sintaks

curl --location-trusted -u <username>:<password> -XPUT <url>
(
    data_desc
)
[opt_properties]        
Catatan
  • Kami menyarankan agar Anda menentukan 100-continue dalam header permintaan HTTP Expect, misalnya, "Expect:100-continue". Praktik ini mencegah transmisi data yang tidak perlu dan mengurangi beban sumber daya jika server menolak tugas impor.

  • Di StarRocks, beberapa kata merupakan kata kunci yang dicadangkan dalam SQL dan tidak dapat digunakan secara langsung dalam pernyataan SQL. Untuk menggunakan kata kunci yang dicadangkan dalam pernyataan SQL, Anda harus membungkusnya dengan tanda backtick (`). Untuk informasi lebih lanjut tentang kata kunci yang dicadangkan, lihat Keywords.

Parameter

  • <username>:<password>: Nama pengguna dan kata sandi untuk kluster StarRocks. Parameter ini wajib diisi. Jika akun tidak memiliki kata sandi, cukup tentukan nama pengguna diikuti tanda titik dua, seperti <username>:.

  • XPUT: Metode permintaan HTTP. Parameter ini wajib diisi. Untuk Stream Load, tentukan metode PUT.

  • <url>: URL untuk permintaan Stream Load. Parameter ini wajib diisi. Formatnya adalah http://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load.

    Tabel berikut menjelaskan parameter dalam URL tersebut.

    Parameter

    Wajib

    Deskripsi

    <fe_host>

    Ya

    Alamat IP node frontend (FE) dalam kluster StarRocks.

    <fe_http_port>

    Ya

    Port HTTP node FE dalam kluster StarRocks. Nilai default-nya adalah 18030.

    Anda dapat mencari parameter http_port pada tab Configuration halaman layanan Cluster Service StarRocks untuk melihat nomor port tersebut. Selain itu, Anda juga dapat menjalankan perintah SHOW FRONTENDS untuk melihat alamat IP dan nomor port HTTP node FE.

    <database_name>

    Ya

    Nama database tempat tabel StarRocks tujuan berada.

    <table_name>

    Ya

    Nama tabel StarRocks tujuan.

  • desc: Menentukan properti file data sumber, termasuk nama file, format, pemisah kolom, pemisah baris, partisi tujuan, dan pemetaan kolom ke tabel StarRocks. Formatnya sebagai berikut.

    -T <file_path>
    -H "format: CSV | JSON"
    -H "column_separator: <column_separator>"
    -H "row_delimiter: <row_delimiter>"
    -H "columns: <column1_name>[, <column2_name>, ... ]"
    -H "partitions: <partition1_name>[, <partition2_name>, ...]"
    -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]"
    -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]"
    -H "strip_outer_array: true | false"
    -H "json_root: <json_path>"
    -H "ignore_json_size: true | false"
    -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"

    Parameter dalam data_desc terbagi menjadi tiga kategori: parameter umum, parameter untuk CSV, dan parameter untuk JSON.

    • Parameter umum

      Parameter

      Wajib

      Deskripsi

      <file_path>

      Ya

      Jalur tempat file data sumber disimpan. Nama file boleh menyertakan ekstensi secara opsional.

      format

      Tidak

      Format data yang akan diimpor. Nilai yang valid adalah CSV dan JSON. Nilai default-nya adalah CSV.

      partitions

      Tidak

      Partisi tertentu tempat Anda ingin mengimpor data. Jika Anda tidak menentukan parameter ini, data akan diimpor ke semua partisi tabel StarRocks secara default.

      temporary_partitions

      Tidak

      Partisi sementara tempat Anda ingin mengimpor data.

      columns

      Tidak

      Pemetaan antara kolom dalam file data sumber dan kolom dalam tabel StarRocks.

      • Jika kolom dalam file data sumber sesuai dengan kolom dalam tabel StarRocks secara berurutan, Anda tidak perlu menentukan parameter ini.

      • Jika file data sumber tidak sesuai dengan skema tabel, Anda harus menentukan parameter ini untuk mengonfigurasi aturan transformasi data. Kolom dapat berupa dua bentuk. Satu bentuk secara langsung sesuai dengan bidang dalam file yang diimpor dan dapat direpresentasikan dengan nama bidang tersebut. Bentuk lainnya perlu diturunkan melalui perhitungan.

        • Contoh 1: Tabel memiliki tiga kolom: c1, c2, c3. Tiga kolom dalam file sumber sesuai dengan c3,c2,c1 secara berurutan. Anda harus menentukan -H "columns: c3, c2, c1".

        • Contoh 2: Tabel memiliki tiga kolom: c1, c2, c3. Tiga kolom pertama dalam file sumber sesuai dengan kolom tabel, tetapi ada kolom tambahan. Anda harus menentukan -H "columns: c1, c2, c3, temp". Kolom terakhir dapat memiliki nama apa pun sebagai placeholder.

        • Contoh 3: Tabel memiliki tiga kolom: year, month, day. File sumber hanya memiliki satu kolom waktu dalam format 2018-06-01 01:02:03. Anda dapat menentukan -H "columns: col, year = year(col), month=month(col), day=day(col)" untuk menyelesaikan impor.

    • Parameter untuk CSV

      Parameter

      Wajib

      Deskripsi

      column_separator

      Tidak

      Pemisah kolom dalam file data sumber. Nilai default-nya adalah \t.

      Untuk karakter tak terlihat, Anda harus menambahkan awalan \x dan menggunakan representasi heksadesimal untuk pemisah tersebut. Misalnya, untuk pemisah file Hive \x01, tentukan -H "column_separator:\x01".

      row_delimiter

      Tidak

      Pemisah baris dalam file data sumber. Nilai default-nya adalah \n.

      Penting

      Perintah curl tidak dapat meneruskan \n. Saat Anda menentukan secara manual line feed sebagai \n, shell akan meneruskan backslash (\) lalu n, bukan meneruskan karakter line feed \n secara langsung.

      Bash mendukung sintaks string escape lainnya. Untuk meneruskan \n dan \t, mulailah string dengan tanda dolar dan tanda kutip tunggal ($') serta akhiri dengan tanda kutip tunggal ('). Misalnya, -H $'row_delimiter:\n'.

      skip_header

      Tidak

      Jumlah baris header yang dilewati di awal file CSV. Nilainya harus berupa bilangan bulat. Nilai default-nya adalah 0.

      Dalam beberapa file CSV, beberapa baris pertama digunakan untuk mendefinisikan metadata seperti nama dan tipe kolom. Dengan mengatur parameter ini, Anda dapat membuat StarRocks mengabaikan beberapa baris pertama file CSV selama impor. Misalnya, jika Anda mengatur parameter ini ke 1, StarRocks akan mengabaikan baris pertama file CSV selama impor.

      Pemisah baris yang digunakan di sini harus sama dengan yang Anda tetapkan dalam perintah impor.

      where

      Tidak

      Digunakan untuk mengekstrak sebagian data. Untuk memfilter data yang tidak diinginkan, Anda dapat mengatur opsi ini.

      Misalnya, untuk hanya mengimpor data di mana kolom k1 sama dengan 20180601, tentukan -H "where: k1 = 20180601".

      max_filter_ratio

      Tidak

      Rasio maksimum baris yang dapat difilter karena masalah seperti kualitas data. Nilai default-nya adalah 0, yang berarti toleransi nol.

      Catatan

      Data yang difilter oleh klausa WHERE tidak termasuk.

      partitions

      Tidak

      Partisi yang terlibat dalam impor ini.

      Jika Anda dapat menentukan partisi untuk data tersebut, kami menyarankan agar Anda menentukan parameter ini. Data yang tidak memenuhi partisi yang ditentukan akan difilter. Misalnya, untuk mengimpor data ke partisi p1 dan p2, tentukan -H "partitions: p1, p2".

      timeout

      Tidak

      Waktu tenggang untuk impor. Nilai default-nya adalah 600 detik.

      Nilainya berkisar antara 1 hingga 259200. Satuannya adalah detik.

      strict_mode

      Tidak

      Menentukan apakah akan mengaktifkan mode ketat untuk impor ini. Nilai yang valid:

      • false (default): Mode ketat dinonaktifkan.

      • true: Mode ketat diaktifkan. Saat diaktifkan, filter ketat diterapkan pada konversi tipe kolom selama proses impor.

      timezone

      Tidak

      Zona waktu untuk impor ini. Default-nya adalah UTC+8.

      Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.

      exec_mem_limit

      Tidak

      Batas memori untuk impor. Nilai default-nya adalah 2 GB.

    • Parameter untuk JSON

      Parameter

      Wajib

      Deskripsi

      jsonpaths

      Tidak

      Nama bidang yang akan diimpor. Parameter ini hanya wajib saat Anda mengimpor data berformat JSON dalam mode pencocokan. Nilai parameter harus dalam format JSON.

      strip_outer_array

      Tidak

      Menentukan apakah akan memangkas struktur array terluar. Nilai yang valid:

      • false (default): Mempertahankan struktur asli data JSON tanpa menghapus array terluar. Seluruh array JSON diimpor sebagai satu nilai tunggal.

        Misalnya, untuk data sampel [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jika strip_outer_array diatur ke false, data diurai sebagai satu array tunggal dan diimpor ke tabel.

      • true: Saat data yang diimpor berupa array JSON, Anda harus mengatur strip_outer_array ke true.

        Misalnya, untuk data sampel [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jika strip_outer_array diatur ke true, data diurai sebagai dua catatan dan diimpor ke tabel.

      json_root

      Tidak

      Elemen akar dari data JSON yang akan diimpor. Parameter ini hanya wajib saat Anda mengimpor data berformat JSON dalam mode pencocokan. Nilai parameter harus berupa string JsonPath yang valid. Nilai default-nya kosong, yang berarti seluruh file data JSON diimpor.

      ignore_json_size

      Tidak

      Menentukan apakah akan memeriksa ukuran Body JSON dalam permintaan HTTP.

      Catatan

      Secara default, ukuran body JSON dalam permintaan HTTP tidak boleh melebihi 100 MB. Jika ukuran body JSON melebihi 100 MB, kesalahan berikut dilaporkan: "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Tetapkan ignore_json_size untuk melewati pemeriksaan, meskipun hal ini dapat menyebabkan konsumsi memori yang sangat besar." Untuk menghindari kesalahan ini, Anda dapat menambahkan pengaturan "ignore_json_size:true" ke header permintaan HTTP untuk melewati pemeriksaan ukuran body JSON.

      compression, Content-Encoding

      Tidak

      Algoritma kompresi yang digunakan selama transmisi data untuk Stream Load. Algoritma yang didukung meliputi GZIP, BZIP2, LZ4_FRAME, dan Zstandard.

      Misalnya, curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4.

  • opt_properties: Parameter opsional untuk impor. Pengaturan yang ditentukan berlaku untuk seluruh tugas impor.

    Formatnya sebagai berikut.

    -H "label: <label_name>"
    -H "where: <kondisi1>[, <kondisi2>, ...]"
    -H "max_filter_ratio: <num>"
    -H "timeout: <num>"
    -H "strict_mode: true | false"
    -H "timezone: <string>"
    -H "load_mem_limit: <num>"
    -H "partial_update: true | false"
    -H "partial_update_mode: row | column"
    -H "merge_condition: <column_name>"

    Tabel berikut menjelaskan parameter-parameter tersebut.

    Parameter

    Wajib

    Deskripsi

    label

    Tidak

    Label untuk tugas impor. Data dengan label yang sama tidak dapat diimpor berulang kali.

    Anda dapat menentukan label untuk mencegah impor data duplikat. StarRocks menyimpan label tugas yang berhasil diselesaikan selama 30 menit terakhir.

    where

    Tidak

    Kondisi filter. Jika Anda menentukan parameter ini, StarRocks akan memfilter data yang telah ditransformasi berdasarkan kondisi yang ditentukan. Hanya data yang memenuhi kondisi filter yang didefinisikan dalam klausa where yang diimpor.

    Misalnya, untuk hanya mengimpor data di mana kolom k1 sama dengan 20180601, tentukan -H "where: k1 = 20180601".

    max_filter_ratio

    Tidak

    Rasio maksimum baris yang dapat difilter karena masalah seperti kualitas data. Nilai default-nya adalah 0, yang berarti toleransi nol.

    Catatan

    Data yang difilter oleh klausa where tidak termasuk.

    log_rejected_record_num

    Tidak

    Jumlah maksimum baris yang dapat difilter karena kualitas data buruk. Parameter ini didukung mulai versi 3.1. Nilai yang valid: 0, -1, atau bilangan bulat positif. Nilai default-nya adalah 0.

    • 0: Tidak mencatat baris yang difilter.

    • -1: Mencatat semua baris yang difilter.

    • Bilangan bulat positif (seperti n) mencatat maksimal n baris yang difilter pada setiap node BE (atau CN).

    timeout

    Tidak

    Waktu tenggang untuk impor. Nilai default-nya adalah 600 detik.

    Nilainya berkisar antara 1 hingga 259200. Satuannya adalah detik.

    strict_mode

    Tidak

    Menentukan apakah akan mengaktifkan mode ketat untuk impor ini.

    • false (default): Mode ketat dinonaktifkan.

    • true: Mode ketat diaktifkan.

    timezone

    Tidak

    Zona waktu untuk impor ini. Default-nya adalah UTC+8.

    Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.

    load_mem_limit

    Tidak

    Batas memori untuk tugas impor. Nilai default-nya adalah 2 GB.

    partial_update

    Tidak

    Menentukan apakah akan menggunakan pembaruan kolom parsial. Nilai yang valid adalah TRUE dan FALSE. Nilai default-nya adalah FALSE.

    partial_update_mode

    Tidak

    Mode untuk pembaruan parsial. Nilai yang valid adalah row dan column.

    • row (default): Menentukan mode baris untuk pembaruan parsial. Mode ini cocok untuk pembaruan real-time banyak kolom dalam batch kecil.

    • column: Menentukan mode kolom untuk pembaruan parsial. Mode ini cocok untuk pembaruan batch beberapa kolom di banyak baris. Dalam skenario ini, mengaktifkan mode kolom memberikan pembaruan yang lebih cepat.

      Misalnya, dalam tabel dengan 100 kolom, jika Anda memperbarui 10 kolom (10% dari total) untuk semua baris, mengaktifkan mode kolom dapat meningkatkan kinerja pembaruan hingga 10 kali lipat.

    merge_condition

    Tidak

    Nama kolom yang digunakan sebagai kondisi agar pembaruan berlaku. Pembaruan hanya berlaku jika nilai kolom ini dalam data yang diimpor lebih besar dari atau sama dengan nilai saat ini.

    Catatan

    Kolom yang ditentukan harus berupa kolom non-kunci primer. Pembaruan bersyarat hanya didukung untuk tabel kunci primer.

Contoh

Contoh ini menunjukkan cara mengimpor file data.csv ke tabel `example_table` dalam database `load_test` di kluster StarRocks. Untuk contoh lengkap, lihat Contoh lengkap impor data.

curl --location-trusted -u "root:" \
     -H "Expect:100-continue" \
     -H "label:label2" \
     -H "column_separator: ," \
     -T data.csv -XPUT \
     http://172.17.**.**:18030/api/load_test/example_table/_stream_load

Nilai kembali

Setelah impor selesai, hasil tugas impor dikembalikan dalam format JSON. Berikut ini adalah contoh tanggapan.

{
    "TxnId": 9,
    "Label": "label2",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 4,
    "NumberLoadedRows": 4,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 45,
    "LoadTimeMs": 235,
    "BeginTxnTimeMs": 101,
    "StreamLoadPlanTimeMs": 102,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 11,
    "CommitAndPublishTimeMs": 19
}

Tabel berikut menjelaskan parameter dalam tanggapan tersebut.

Parameter

Deskripsi

TxnId

ID transaksi impor.

Label

Label impor.

Status

Status impor. Nilai yang valid:

  • Success: Data berhasil diimpor dan terlihat.

  • Publish Timeout: Impor berhasil dikirim, tetapi data mungkin terlihat dengan penundaan. Anda tidak perlu mengulangi impor.

  • Label Already Exists: Label duplikat. Anda harus mengubah Label.

  • Fail: Impor data gagal.

ExistingJobStatus

Status tugas impor yang sesuai dengan label yang sudah ada. Bidang ini hanya ditampilkan ketika Status adalah Label Already Exists. Anda dapat menggunakan status ini untuk mengetahui status tugas impor yang sesuai dengan label yang sudah ada.

  • RUNNING: Tugas sedang berjalan.

  • FINISHED: Tugas berhasil.

Message

Rincian tentang status tugas impor. Jika impor gagal, alasan spesifik kegagalannya dikembalikan.

NumberTotalRows

Jumlah total baris yang dibaca dari aliran data.

NumberLoadedRows

Jumlah baris yang diimpor. Ini hanya berlaku jika status impor adalah Success.

NumberFilteredRows

Jumlah baris yang difilter, yaitu baris dengan kualitas data yang tidak memenuhi syarat.

NumberUnselectedRows

Jumlah baris yang difilter oleh kondisi Where.

LoadBytes

Ukuran file sumber untuk tugas impor.

LoadTimeMs

Waktu yang dibutuhkan untuk tugas impor, dalam milidetik.

BeginTxnTimeMs

Waktu yang dibutuhkan untuk memulai transaksi untuk tugas impor.

StreamLoadPlanTimeMs

Waktu yang dibutuhkan untuk menghasilkan rencana eksekusi untuk tugas impor.

ReadDataTimeMs

Waktu yang dibutuhkan untuk membaca data untuk tugas impor.

WriteDataTimeMs

Waktu yang dibutuhkan untuk menulis data untuk tugas impor.

ErrorURL

Parameter ini dikembalikan jika tugas gagal mengimpor data.

Anda dapat menggunakan ErrorURL untuk melihat rincian baris kesalahan yang difilter karena kualitas data buruk selama proses impor. Saat Anda mengirimkan tugas impor, Anda dapat menggunakan parameter opsional log_rejected_record_num untuk menentukan jumlah maksimum baris kesalahan yang akan dicatat.

Anda dapat menjalankan perintah curl "url" untuk melihat informasi baris kesalahan secara langsung, atau menjalankan perintah wget "url" untuk mengekspor informasi baris kesalahan tersebut.

Misalnya, untuk mengekspor informasi baris kesalahan:

wget "http://172.17.**.**:18040/api/_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad"

Informasi baris kesalahan yang diekspor disimpan ke file lokal bernama _load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad. Anda dapat menjalankan perintah cat _load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad untuk melihat isi file tersebut.

Anda dapat menyesuaikan tugas impor berdasarkan pesan kesalahan tersebut, lalu mengirim ulang tugas tersebut.

Membatalkan tugas impor

Anda tidak dapat membatalkan tugas Stream Load secara manual. Tugas akan dibatalkan secara otomatis jika melebihi waktu tenggang atau terjadi kesalahan. Anda dapat menggunakan ErrorURL dari nilai kembali untuk mengunduh pesan kesalahan guna pemecahan masalah.

Contoh lengkap impor data

Contoh ini menggunakan perintah curl untuk membuat tugas impor.

  1. Buat tabel untuk data yang akan diimpor.

    1. Login ke node master kluster StarRocks menggunakan Secure Shell (SSH). Untuk informasi lebih lanjut, lihat Login ke kluster.

    2. Jalankan perintah berikut untuk terhubung ke kluster StarRocks menggunakan klien MySQL.

      mysql -h127.0.0.1  -P 9030 -uroot
    3. Jalankan perintah berikut untuk membuat database dan tabel.

      CREATE DATABASE IF NOT EXISTS load_test;
      USE load_test;
      
      CREATE TABLE IF NOT EXISTS example_table (
          id INT,
          name VARCHAR(50),
          age INT
      )
      DUPLICATE KEY(id)
      DISTRIBUTED BY HASH(id) BUCKETS 3
      PROPERTIES (
          "replication_num" = "1"  -- Atur jumlah replika menjadi 1.
      );
      

      Setelah menjalankan perintah tersebut, tekan Ctrl+D untuk keluar dari klien MySQL.

  2. Persiapkan data uji.

    Persiapkan data CSV

    Misalnya, buat file bernama data.csv dengan konten berikut.

    id,name,age
    1,Alice,25
    2,Bob,30
    3,Charlie,35

    Persiapkan data JSON

    Misalnya, buat file bernama json.data dengan konten berikut.

    {"id":1,"name":"Emily","age":25}
    {"id":2,"name":"Benjamin","age":35}
    {"id":3,"name":"Olivia","age":28}
    {"id":4,"name":"Alexander","age":60}
    {"id":5,"name":"Ava","age":17}
  3. Jalankan perintah berikut untuk membuat tugas impor.

    Impor data CSV

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label1" \
         -H "column_separator: ," \
         -T data.csv -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

    Impor data JSON

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label2" \
         -H "format:json" \
         -T json.data -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

Contoh integrasi kode