全部产品
Search
文档中心

E-MapReduce:Stream Load

更新时间:Jun 24, 2025

StarRocks memungkinkan Anda mengimpor file CSV berukuran hingga 10 GB dari mesin lokal. Topik ini menjelaskan prinsip dasar, praktik terbaik, serta contoh penggunaan Stream Load untuk mengimpor data.

Informasi latar belakang

Stream Load adalah metode impor sinkron yang memungkinkan Anda mengimpor file lokal atau aliran data ke StarRocks melalui permintaan HTTP. Dalam mode ini, hasil impor dikembalikan setelah proses selesai. Anda dapat menentukan keberhasilan impor berdasarkan nilai balasan dari permintaan tersebut.

Istilah

coordinator: Node yang menerima data, mendistribusikannya ke node lain, dan mengembalikan hasil setelah impor selesai.

Prinsip dasar

Stream Load memungkinkan Anda mengirimkan pekerjaan impor melalui permintaan HTTP. Jika permintaan dikirim ke node frontend (FE), FE akan meneruskannya ke node backend (BE) dengan pengalihan HTTP. Anda juga dapat mengirimkan permintaan langsung ke node BE. Node BE bertindak sebagai koordinator untuk membagi data berdasarkan skema tabel dan mendistribusikannya ke node BE terkait. Koordinator kemudian mengembalikan hasil pekerjaan impor kepada Anda.

Gambar berikut mengilustrasikan cara kerja Stream Load.Stream Load

Contoh

Membuat pekerjaan impor

Dalam mode Stream Load, data dikirimkan dan ditransfer menggunakan protokol HTTP. Contoh ini menggunakan perintah curl untuk mengirimkan pekerjaan impor. Anda juga dapat menggunakan klien HTTP lainnya untuk tujuan yang sama.

  • Sintaksis

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
    http://fe_host:http_port/api/{db}/{table}/_stream_load
null
  • HTTP mendukung chunked transfer encoding dan non-chunked transfer encoding. Untuk non-chunked transfer encoding, Anda harus menggunakan header Content-Length untuk menentukan panjang konten yang diunggah guna memastikan integritas data.

  • Disarankan untuk mengatur header Expect ke 100-continue guna mencegah transmisi data yang tidak perlu saat terjadi kesalahan.

Properti header yang didukung dapat dilihat pada deskripsi parameter pekerjaan impor dalam tabel berikut. Parameter dikonfigurasi dalam format -H "key1:value1". Jika ada beberapa parameter, gunakan beberapa -H untuk menunjukkan parameter tersebut. Contoh: -H "key1:value1" -H "key2:value2". Dalam mode Stream Load, semua parameter terkait pekerjaan impor dikonfigurasi di header. Tabel berikut menjelaskan parameter tersebut.

Parameter

Deskripsi

Parameter tanda tangan

user:passwd

Dalam Stream Load, pekerjaan impor dibuat menggunakan protokol HTTP. Tanda tangan dihasilkan untuk pekerjaan impor menggunakan otentikasi akses dasar. StarRocks mengotentikasi identitas pengguna dan izin impor berdasarkan tanda tangan.

Parameter pekerjaan impor

label

Label pekerjaan impor. Data dengan label yang sama tidak dapat diimpor secara berulang.

Anda dapat menentukan label untuk pekerjaan impor untuk mencegah data diimpor secara berulang. StarRocks menyimpan label untuk pekerjaan yang telah selesai dalam 30 menit terakhir.

column_separator

Pemisah kolom file yang ingin Anda impor. Nilai default: \t.

Jika karakter non-cetak digunakan sebagai pemisah kolom, pemisah harus dalam format heksadesimal dan dimulai dengan awalan \x. Misalnya, jika pemisah kolom file Hive adalah \x01, konfigurasikan parameter ini dalam format berikut: -H "column_separator:\x01".

row_delimiter

Pemisah baris file yang ingin Anda impor. Nilai default: \n.

null

\n tidak dapat dilewatkan dengan menggunakan perintah curl. Jika Anda menentukan \n sebagai pemisah baris, Shell pertama-tama melewati garis miring mundur (\) dan kemudian n, bukan langsung melewati \n.

Anda dapat meloloskan string dengan menggunakan skrip Bash. Jika Anda ingin melewati \n dan \t, Anda dapat memulai string dengan tanda dolar ($) dan tanda kutip tunggal penuh ('), dan mengakhiri string dengan tanda kutip tunggal setengah lebar ('). Contoh: -H $'row_delimiter:\n'.

columns

Pemetaan antara kolom dalam file yang ingin Anda impor dan kolom dalam tabel StarRocks.

Jika kolom dalam file sumber sama dengan yang ada di tabel StarRocks, Anda tidak perlu mengonfigurasi parameter ini. Sebaliknya, Anda harus mengonfigurasi parameter ini untuk menentukan aturan konversi data. Anda dapat mengonfigurasi parameter ini dengan menggunakan metode berikut: Tentukan nama kolom dalam tabel StarRocks yang sesuai dengan nama kolom dalam file secara berurutan. Atau, tentukan kolom berdasarkan perhitungan.

  • Contoh 1: Tabel StarRocks berisi kolom berikut: c1, c2, and c3. File sumber berisi tiga kolom yang sesuai dengan kolom berikut dalam tabel secara berurutan: c3, c2, c1. Dalam hal ini, Anda dapat mengatur parameter ini menjadi -H "columns: c3, c2, c1".

  • Contoh 2: Tabel StarRocks berisi kolom berikut: c1, c2, and c3. File sumber berisi empat kolom. Tiga kolom pertama dalam file sumber sesuai dengan c1, c2, dan c3 dalam tabel secara berurutan, dan kolom keempat tidak memiliki kolom yang sesuai. Dalam hal ini, Anda dapat mengatur parameter ini menjadi -H "columns: c1, c2, c3, temp". Anda dapat menentukan nama kustom untuk kolom keempat sebagai placeholder.

  • Contoh 3: Tabel StarRocks berisi kolom berikut: year, month, and day. File sumber hanya berisi kolom waktu dalam format 2018-06-01 01:02:03. Dalam hal ini, Anda dapat mengatur parameter ini menjadi -H "columns: col, year = year(col), month=month(col), day=day(col)".

where

Kondisi filter data. Anda dapat mengonfigurasi parameter ini untuk menyaring data yang tidak diperlukan.

Sebagai contoh, jika Anda hanya ingin mengimpor data yang nilainya di kolom k1 adalah 20180601, Anda dapat mengatur parameter ini menjadi -H "where: k1 = 20180601" selama impor data.

max_filter_ratio

Rasio maksimum data yang dapat disaring. Misalnya, data disaring karena tidak sesuai dengan standar tertentu. Nilai default: 0. Nilai valid: 0 hingga 1.

null

Data yang tidak sesuai dengan standar tidak termasuk data yang disaring oleh kondisi WHERE.

partitions

Partisi ke mana data diimpor.

Kami merekomendasikan Anda untuk mengonfigurasi parameter ini jika Anda memastikan partisi ke mana data diimpor. Data yang tidak termasuk dalam partisi yang ditentukan akan disaring. Sebagai contoh, jika Anda ingin mengimpor data ke partisi p1 dan p2, Anda dapat mengatur parameter ini menjadi -H "partitions: p1, p2".

timeout

Periode timeout pekerjaan impor. Nilai default: 600.

Nilai valid: 1 hingga 259200. Unit: detik.

strict_mode

Menentukan apakah akan mengaktifkan mode ketat untuk pekerjaan impor. Secara default, mode ketat diaktifkan.

Untuk menonaktifkan mode ketat, atur parameter ini menjadi -H "strict_mode: false".

timezone

Zona waktu pekerjaan impor. Zona waktu default adalah UTC+8.

Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam pekerjaan impor.

exec_mem_limit

Ukuran maksimum memori yang tersedia untuk pekerjaan impor. Nilai default: 2. Unit: GB.

  • Contoh

curl --location-trusted -u root -T date -H "label:123" \
    http://abc.com:8030/api/test/date/_stream_load

Setelah pekerjaan impor selesai, informasi tentang pekerjaan tersebut dikembalikan dalam format JSON. Contoh hasil balasan:

{
    "TxnId": 11672,
    "Label": "f6b62abf-4e16-4564-9009-b77823f3c024",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 199563535,
    "NumberLoadedRows": 199563535,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 50706674331,
    "LoadTimeMs": 801327,
    "BeginTxnTimeMs": 103,
    "StreamLoadPlanTimeMs": 0,
    "ReadDataTimeMs": 760189,
    "WriteDataTimeMs": 801023,
    "CommitAndPublishTimeMs": 199"
}

Parameter

Deskripsi

TxnId

ID transaksi pekerjaan impor. ID transaksi dapat sepenuhnya dikelola oleh Alibaba Cloud.

Label

Label pekerjaan impor. Jika Anda telah menentukan label, label tersebut dikembalikan. Jika Anda belum menentukan label, sistem secara otomatis menghasilkan label.

Status

Status pekerjaan impor. Nilai valid:

  • Success: Pekerjaan impor berhasil.

  • Publish Timeout: Pekerjaan impor selesai, tetapi data mungkin terlihat setelah penundaan. Anda tidak perlu mencoba ulang pekerjaan.

  • Label Already Exists: Label sudah ada. Anda harus mengubah label.

  • Fail: Pekerjaan impor gagal.

ExistingJobStatus

Status pekerjaan impor yang sesuai dengan label yang ada. Parameter ini hanya ditampilkan jika nilai parameter Status adalah Label Already Exists. Anda dapat memperoleh status pekerjaan impor yang sesuai dengan label yang ada berdasarkan nilai balasan. Nilai valid:

  • RUNNING: Pekerjaan impor sedang berlangsung.

  • FINISHED: Pekerjaan impor berhasil.

Message

Deskripsi rinci status pekerjaan impor. Jika pekerjaan impor gagal, penyebab kegagalan rinci dikembalikan.

NumberTotalRows

Jumlah total baris data yang dibaca dari aliran data.

NumberLoadedRows

Jumlah baris data yang diimpor dalam pekerjaan impor. Parameter ini hanya dikembalikan jika status pekerjaan impor adalah Success.

NumberFilteredRows

Jumlah baris data yang disaring dalam pekerjaan impor. Baris data yang kualitasnya tidak sesuai dengan standar disaring.

NumberUnselectedRows

Jumlah baris data yang disaring oleh kondisi WHERE.

LoadBytes

Ukuran data file sumber.

LoadTimeMs

Durasi pekerjaan impor. Unit: milidetik.

ErrorURL

URL entri data yang disaring. Hanya 1.000 entri data pertama yang disimpan. Jika pekerjaan impor gagal, Anda dapat menjalankan perintah berikut untuk mendapatkan data yang disaring. Kemudian, Anda dapat menganalisis data dan membuat penyesuaian.

wget http://host:port/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005

Membatalkan pekerjaan impor

Dalam mode Stream Load, Anda dapat menghentikan proses untuk membatalkan pekerjaan impor. Jika pekerjaan impor timeout atau terjadi kesalahan, sistem secara otomatis membatalkan pekerjaan tersebut.

ps -ef | grep stream_load

Praktik terbaik

Skenario

Stream Load cocok untuk skenario di mana file sumber disimpan di memori atau disk lokal. Sebagai metode impor sinkron, Stream Load memungkinkan Anda mendapatkan hasil pekerjaan impor secara langsung.

Ukuran data

Dalam mode Stream Load, node BE mengimpor dan mendistribusikan data. Disarankan untuk menggunakan Stream Load untuk mengimpor data berukuran antara 1 GB hingga 10 GB. Secara default, ukuran maksimum data yang dapat diimpor adalah 10 GB. Untuk mengimpor file yang lebih besar, modifikasi parameter streaming_load_max_mb node BE. Misalnya, jika ukuran file adalah 15 GB (15.360 MB), atur parameter streaming_load_max_mb menjadi nilai lebih besar dari 15 GB.

curl --location-trusted -u 'admin:****'   -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360

Periode timeout default untuk pekerjaan impor dalam Stream Load adalah 600 detik. Untuk menyesuaikan periode timeout, ubah nilai parameter timeout node FE di instance StarRocks Anda melalui Konsol EMR.

Contoh lengkap

Data disimpan di direktori /mnt/disk1/customer.tbl klien tertentu. Anda ingin mengimpor data ke tabel customer database stream_load instance StarRocks Anda.

Unduh data: customer.tbl

Jumlah pekerjaan impor dalam mode Stream Load yang dapat diproses secara bersamaan tidak dipengaruhi oleh ukuran instance.

Prosedur:

  1. Modifikasi file konfigurasi BE.conf node BE jika ukuran file melebihi batas atas default. Sebagai contoh, atur parameter streaming_load_max_mb menjadi 15360. Unit: MB.

    curl --location-trusted -u 'admin:*****'  -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360
  2. Di tab Konfigurasi Instance instance Anda, ubah nilai parameter stream_load_default_timeout_second. Dalam contoh ini, atur nilainya menjadi 3600.

  3. Buat tabel tujuan bernama customer.

    CREATE TABLE `customer` (
      `c_custkey` bigint(20) NULL COMMENT "",
      `c_name` varchar(65533) NULL COMMENT "",
      `c_address` varchar(65533) NULL COMMENT "",
      `c_nationkey` bigint(20) NULL COMMENT "",
      `c_phone` varchar(65533) NULL COMMENT "",
      `c_acctbal` double NULL COMMENT "",
      `c_mktsegment` varchar(65533) NULL COMMENT "",
      `c_comment` varchar(65533) NULL COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`c_custkey`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24
    PROPERTIES (
    "replication_num" = "1",
    "in_memory" = "false",
    "storage_format" = "DEFAULT",
    "enable_persistent_index" = "false",
    "compression" = "LZ4"
    );

  4. Buat pekerjaan impor. Jika kumpulan data berisi banyak data, jalankan operasi di latar belakang.

    curl --location-trusted -u 'admin:*****' -T /mnt/disk1/customer.tbl -H "label:labelname" -H "column_separator:|"  http://fe-c-****-internal.starrocks.aliyuncs.com:8030/api/load_test/customer/_stream_load

    Informasi berikut dikembalikan:

    {
        "TxnId": 575,
        "Label": "labelname",
        "Status": "Success",
        "Message": "OK",
        "NumberTotalRows": 150000,
        "NumberLoadedRows": 150000,
        "NumberFilteredRows": 0,
        "NumberUnselectedRows": 0,
        "LoadBytes": 24196144,
        "LoadTimeMs": 1081,
        "BeginTxnTimeMs": 104,
        "StreamLoadPlanTimeMs": 106,
        "ReadDataTimeMs": 85,
        "WriteDataTimeMs": 850,
        "CommitAndPublishTimeMs": 20
    }
    null

    Jika terjadi kesalahan "ErrorURL": "http://***:8040/api/_load_error_log?file=error_log_***", Anda dapat menjalankan perintah curl untuk melihat detailnya.

Contoh kode untuk integrasi

  • Untuk informasi tentang cara menggunakan Java untuk mengirimkan pekerjaan impor dalam Stream Load, lihat stream_load.

  • Untuk informasi tentang cara menggunakan Spark untuk mengirimkan pekerjaan impor dalam Stream Load, lihat 01_sparkStreaming2StarRocks.