Anda dapat menggunakan Stream Load untuk mengimpor file lokal atau aliran data ke StarRocks. Topik ini menjelaskan proses pengimporan data menggunakan Stream Load.
Informasi latar belakang
Stream Load adalah metode impor sinkron yang memungkinkan Anda mengimpor file lokal atau aliran data ke StarRocks melalui permintaan HTTP. Tanggapan HTTP akan mengembalikan hasil impor, yang menunjukkan apakah tugas berhasil atau tidak. Stream Load mendukung format file CSV dan JSON, dengan ukuran data maksimum 10 GB per impor.
Buat tugas impor
Stream Load mengirimkan dan mentransfer data menggunakan protokol HTTP. Topik ini menggunakan perintah curl untuk menunjukkan cara mengirimkan tugas impor. Anda juga dapat menggunakan klien HTTP lainnya.
Sintaks
curl --location-trusted -u <username>:<password> -XPUT <url>
(
data_desc
)
[opt_properties] Kami menyarankan agar Anda menentukan
100-continuedalam header permintaan HTTPExpect, misalnya,"Expect:100-continue". Praktik ini mencegah transmisi data yang tidak perlu dan mengurangi beban sumber daya jika server menolak tugas impor.Di StarRocks, beberapa kata merupakan kata kunci yang dicadangkan dalam SQL dan tidak dapat digunakan secara langsung dalam pernyataan SQL. Untuk menggunakan kata kunci yang dicadangkan dalam pernyataan SQL, Anda harus membungkusnya dengan tanda backtick (`). Untuk informasi lebih lanjut tentang kata kunci yang dicadangkan, lihat Keywords.
Parameter
<username>:<password>: Nama pengguna dan kata sandi untuk kluster StarRocks. Parameter ini wajib diisi. Jika akun tidak memiliki kata sandi, cukup tentukan nama pengguna diikuti tanda titik dua, seperti<username>:.XPUT: Metode permintaan HTTP. Parameter ini wajib diisi. Untuk Stream Load, tentukan metode PUT.<url>: URL untuk permintaan Stream Load. Parameter ini wajib diisi. Formatnya adalahhttp://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load.Tabel berikut menjelaskan parameter dalam URL tersebut.
Parameter
Wajib
Deskripsi
<fe_host>Ya
Alamat IP node frontend (FE) dalam kluster StarRocks.
<fe_http_port>Ya
Port HTTP node FE dalam kluster StarRocks. Nilai default-nya adalah 18030.
Anda dapat mencari parameter http_port pada tab Configuration halaman layanan Cluster Service StarRocks untuk melihat nomor port tersebut. Selain itu, Anda juga dapat menjalankan perintah
SHOW FRONTENDSuntuk melihat alamat IP dan nomor port HTTP node FE.<database_name>Ya
Nama database tempat tabel StarRocks tujuan berada.
<table_name>Ya
Nama tabel StarRocks tujuan.
desc: Menentukan properti file data sumber, termasuk nama file, format, pemisah kolom, pemisah baris, partisi tujuan, dan pemetaan kolom ke tabel StarRocks. Formatnya sebagai berikut.-T <file_path> -H "format: CSV | JSON" -H "column_separator: <column_separator>" -H "row_delimiter: <row_delimiter>" -H "columns: <column1_name>[, <column2_name>, ... ]" -H "partitions: <partition1_name>[, <partition2_name>, ...]" -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]" -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]" -H "strip_outer_array: true | false" -H "json_root: <json_path>" -H "ignore_json_size: true | false" -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"Parameter dalam
data_descterbagi menjadi tiga kategori: parameter umum, parameter untuk CSV, dan parameter untuk JSON.Parameter umum
Parameter
Wajib
Deskripsi
<file_path>Ya
Jalur tempat file data sumber disimpan. Nama file boleh menyertakan ekstensi secara opsional.
formatTidak
Format data yang akan diimpor. Nilai yang valid adalah
CSVdanJSON. Nilai default-nya adalahCSV.partitionsTidak
Partisi tertentu tempat Anda ingin mengimpor data. Jika Anda tidak menentukan parameter ini, data akan diimpor ke semua partisi tabel StarRocks secara default.
temporary_partitionsTidak
Partisi sementara tempat Anda ingin mengimpor data.
columnsTidak
Pemetaan antara kolom dalam file data sumber dan kolom dalam tabel StarRocks.
Jika kolom dalam file data sumber sesuai dengan kolom dalam tabel StarRocks secara berurutan, Anda tidak perlu menentukan parameter ini.
Jika file data sumber tidak sesuai dengan skema tabel, Anda harus menentukan parameter ini untuk mengonfigurasi aturan transformasi data. Kolom dapat berupa dua bentuk. Satu bentuk secara langsung sesuai dengan bidang dalam file yang diimpor dan dapat direpresentasikan dengan nama bidang tersebut. Bentuk lainnya perlu diturunkan melalui perhitungan.
Contoh 1: Tabel memiliki tiga kolom:
c1, c2, c3. Tiga kolom dalam file sumber sesuai denganc3,c2,c1secara berurutan. Anda harus menentukan-H "columns: c3, c2, c1".Contoh 2: Tabel memiliki tiga kolom:
c1, c2, c3. Tiga kolom pertama dalam file sumber sesuai dengan kolom tabel, tetapi ada kolom tambahan. Anda harus menentukan-H "columns: c1, c2, c3, temp". Kolom terakhir dapat memiliki nama apa pun sebagai placeholder.Contoh 3: Tabel memiliki tiga kolom:
year, month, day. File sumber hanya memiliki satu kolom waktu dalam format 2018-06-01 01:02:03. Anda dapat menentukan-H "columns: col, year = year(col), month=month(col), day=day(col)"untuk menyelesaikan impor.
Parameter untuk CSV
Parameter
Wajib
Deskripsi
column_separatorTidak
Pemisah kolom dalam file data sumber. Nilai default-nya adalah
\t.Untuk karakter tak terlihat, Anda harus menambahkan awalan
\xdan menggunakan representasi heksadesimal untuk pemisah tersebut. Misalnya, untuk pemisah file Hive\x01, tentukan-H "column_separator:\x01".row_delimiterTidak
Pemisah baris dalam file data sumber. Nilai default-nya adalah
\n.PentingPerintah curl tidak dapat meneruskan \n. Saat Anda menentukan secara manual line feed sebagai \n, shell akan meneruskan backslash (\) lalu n, bukan meneruskan karakter line feed \n secara langsung.
Bash mendukung sintaks string escape lainnya. Untuk meneruskan
\ndan\t, mulailah string dengan tanda dolar dan tanda kutip tunggal ($') serta akhiri dengan tanda kutip tunggal ('). Misalnya,-H $'row_delimiter:\n'.skip_headerTidak
Jumlah baris header yang dilewati di awal file CSV. Nilainya harus berupa bilangan bulat. Nilai default-nya adalah 0.
Dalam beberapa file CSV, beberapa baris pertama digunakan untuk mendefinisikan metadata seperti nama dan tipe kolom. Dengan mengatur parameter ini, Anda dapat membuat StarRocks mengabaikan beberapa baris pertama file CSV selama impor. Misalnya, jika Anda mengatur parameter ini ke 1, StarRocks akan mengabaikan baris pertama file CSV selama impor.
Pemisah baris yang digunakan di sini harus sama dengan yang Anda tetapkan dalam perintah impor.
whereTidak
Digunakan untuk mengekstrak sebagian data. Untuk memfilter data yang tidak diinginkan, Anda dapat mengatur opsi ini.
Misalnya, untuk hanya mengimpor data di mana kolom k1 sama dengan 20180601, tentukan
-H "where: k1 = 20180601".max_filter_ratioTidak
Rasio maksimum baris yang dapat difilter karena masalah seperti kualitas data. Nilai default-nya adalah 0, yang berarti toleransi nol.
CatatanData yang difilter oleh klausa WHERE tidak termasuk.
partitionsTidak
Partisi yang terlibat dalam impor ini.
Jika Anda dapat menentukan partisi untuk data tersebut, kami menyarankan agar Anda menentukan parameter ini. Data yang tidak memenuhi partisi yang ditentukan akan difilter. Misalnya, untuk mengimpor data ke partisi p1 dan p2, tentukan
-H "partitions: p1, p2".timeoutTidak
Waktu tenggang untuk impor. Nilai default-nya adalah 600 detik.
Nilainya berkisar antara 1 hingga 259200. Satuannya adalah detik.
strict_modeTidak
Menentukan apakah akan mengaktifkan mode ketat untuk impor ini. Nilai yang valid:
false (default): Mode ketat dinonaktifkan.
true: Mode ketat diaktifkan. Saat diaktifkan, filter ketat diterapkan pada konversi tipe kolom selama proses impor.
timezoneTidak
Zona waktu untuk impor ini. Default-nya adalah UTC+8.
Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.
exec_mem_limitTidak
Batas memori untuk impor. Nilai default-nya adalah 2 GB.
Parameter untuk JSON
Parameter
Wajib
Deskripsi
jsonpathsTidak
Nama bidang yang akan diimpor. Parameter ini hanya wajib saat Anda mengimpor data berformat JSON dalam mode pencocokan. Nilai parameter harus dalam format JSON.
strip_outer_arrayTidak
Menentukan apakah akan memangkas struktur array terluar. Nilai yang valid:
false (default): Mempertahankan struktur asli data JSON tanpa menghapus array terluar. Seluruh array JSON diimpor sebagai satu nilai tunggal.
Misalnya, untuk data sampel
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jikastrip_outer_arraydiatur ke false, data diurai sebagai satu array tunggal dan diimpor ke tabel.true: Saat data yang diimpor berupa array JSON, Anda harus mengatur
strip_outer_arrayke true.Misalnya, untuk data sampel
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jikastrip_outer_arraydiatur ke true, data diurai sebagai dua catatan dan diimpor ke tabel.
json_rootTidak
Elemen akar dari data JSON yang akan diimpor. Parameter ini hanya wajib saat Anda mengimpor data berformat JSON dalam mode pencocokan. Nilai parameter harus berupa string JsonPath yang valid. Nilai default-nya kosong, yang berarti seluruh file data JSON diimpor.
ignore_json_sizeTidak
Menentukan apakah akan memeriksa ukuran Body JSON dalam permintaan HTTP.
CatatanSecara default, ukuran body JSON dalam permintaan HTTP tidak boleh melebihi 100 MB. Jika ukuran body JSON melebihi 100 MB, kesalahan berikut dilaporkan:
"The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Tetapkan ignore_json_size untuk melewati pemeriksaan, meskipun hal ini dapat menyebabkan konsumsi memori yang sangat besar." Untuk menghindari kesalahan ini, Anda dapat menambahkan pengaturan"ignore_json_size:true"ke header permintaan HTTP untuk melewati pemeriksaan ukuran body JSON.compression, Content-EncodingTidak
Algoritma kompresi yang digunakan selama transmisi data untuk Stream Load. Algoritma yang didukung meliputi GZIP, BZIP2, LZ4_FRAME, dan Zstandard.
Misalnya,
curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4.
opt_properties: Parameter opsional untuk impor. Pengaturan yang ditentukan berlaku untuk seluruh tugas impor.Formatnya sebagai berikut.
-H "label: <label_name>" -H "where: <kondisi1>[, <kondisi2>, ...]" -H "max_filter_ratio: <num>" -H "timeout: <num>" -H "strict_mode: true | false" -H "timezone: <string>" -H "load_mem_limit: <num>" -H "partial_update: true | false" -H "partial_update_mode: row | column" -H "merge_condition: <column_name>"Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Wajib
Deskripsi
labelTidak
Label untuk tugas impor. Data dengan label yang sama tidak dapat diimpor berulang kali.
Anda dapat menentukan label untuk mencegah impor data duplikat. StarRocks menyimpan label tugas yang berhasil diselesaikan selama 30 menit terakhir.
whereTidak
Kondisi filter. Jika Anda menentukan parameter ini, StarRocks akan memfilter data yang telah ditransformasi berdasarkan kondisi yang ditentukan. Hanya data yang memenuhi kondisi filter yang didefinisikan dalam klausa
whereyang diimpor.Misalnya, untuk hanya mengimpor data di mana kolom k1 sama dengan 20180601, tentukan
-H "where: k1 = 20180601".max_filter_ratioTidak
Rasio maksimum baris yang dapat difilter karena masalah seperti kualitas data. Nilai default-nya adalah 0, yang berarti toleransi nol.
CatatanData yang difilter oleh klausa
wheretidak termasuk.log_rejected_record_numTidak
Jumlah maksimum baris yang dapat difilter karena kualitas data buruk. Parameter ini didukung mulai versi 3.1. Nilai yang valid:
0,-1, atau bilangan bulat positif. Nilai default-nya adalah0.0: Tidak mencatat baris yang difilter.-1: Mencatat semua baris yang difilter.Bilangan bulat positif (seperti
n) mencatat maksimalnbaris yang difilter pada setiap node BE (atau CN).
timeoutTidak
Waktu tenggang untuk impor. Nilai default-nya adalah 600 detik.
Nilainya berkisar antara 1 hingga 259200. Satuannya adalah detik.
strict_modeTidak
Menentukan apakah akan mengaktifkan mode ketat untuk impor ini.
false (default): Mode ketat dinonaktifkan.
true: Mode ketat diaktifkan.
timezoneTidak
Zona waktu untuk impor ini. Default-nya adalah UTC+8.
Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor.
load_mem_limitTidak
Batas memori untuk tugas impor. Nilai default-nya adalah 2 GB.
partial_updateTidak
Menentukan apakah akan menggunakan pembaruan kolom parsial. Nilai yang valid adalah
TRUEdanFALSE. Nilai default-nya adalahFALSE.partial_update_modeTidak
Mode untuk pembaruan parsial. Nilai yang valid adalah
rowdancolumn.row(default): Menentukan mode baris untuk pembaruan parsial. Mode ini cocok untuk pembaruan real-time banyak kolom dalam batch kecil.column: Menentukan mode kolom untuk pembaruan parsial. Mode ini cocok untuk pembaruan batch beberapa kolom di banyak baris. Dalam skenario ini, mengaktifkan mode kolom memberikan pembaruan yang lebih cepat.Misalnya, dalam tabel dengan 100 kolom, jika Anda memperbarui 10 kolom (10% dari total) untuk semua baris, mengaktifkan mode kolom dapat meningkatkan kinerja pembaruan hingga 10 kali lipat.
merge_conditionTidak
Nama kolom yang digunakan sebagai kondisi agar pembaruan berlaku. Pembaruan hanya berlaku jika nilai kolom ini dalam data yang diimpor lebih besar dari atau sama dengan nilai saat ini.
CatatanKolom yang ditentukan harus berupa kolom non-kunci primer. Pembaruan bersyarat hanya didukung untuk tabel kunci primer.
Contoh
Contoh ini menunjukkan cara mengimpor file data.csv ke tabel `example_table` dalam database `load_test` di kluster StarRocks. Untuk contoh lengkap, lihat Contoh lengkap impor data.
curl --location-trusted -u "root:" \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator: ," \
-T data.csv -XPUT \
http://172.17.**.**:18030/api/load_test/example_table/_stream_loadNilai kembali
Setelah impor selesai, hasil tugas impor dikembalikan dalam format JSON. Berikut ini adalah contoh tanggapan.
{
"TxnId": 9,
"Label": "label2",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 4,
"NumberLoadedRows": 4,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 45,
"LoadTimeMs": 235,
"BeginTxnTimeMs": 101,
"StreamLoadPlanTimeMs": 102,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 11,
"CommitAndPublishTimeMs": 19
}Tabel berikut menjelaskan parameter dalam tanggapan tersebut.
Parameter | Deskripsi |
| ID transaksi impor. |
| Label impor. |
| Status impor. Nilai yang valid:
|
| Status tugas impor yang sesuai dengan label yang sudah ada. Bidang ini hanya ditampilkan ketika
|
| Rincian tentang status tugas impor. Jika impor gagal, alasan spesifik kegagalannya dikembalikan. |
| Jumlah total baris yang dibaca dari aliran data. |
| Jumlah baris yang diimpor. Ini hanya berlaku jika status impor adalah Success. |
| Jumlah baris yang difilter, yaitu baris dengan kualitas data yang tidak memenuhi syarat. |
| Jumlah baris yang difilter oleh kondisi Where. |
| Ukuran file sumber untuk tugas impor. |
| Waktu yang dibutuhkan untuk tugas impor, dalam milidetik. |
| Waktu yang dibutuhkan untuk memulai transaksi untuk tugas impor. |
| Waktu yang dibutuhkan untuk menghasilkan rencana eksekusi untuk tugas impor. |
| Waktu yang dibutuhkan untuk membaca data untuk tugas impor. |
| Waktu yang dibutuhkan untuk menulis data untuk tugas impor. |
| Parameter ini dikembalikan jika tugas gagal mengimpor data. Anda dapat menggunakan Anda dapat menjalankan perintah Misalnya, untuk mengekspor informasi baris kesalahan: Informasi baris kesalahan yang diekspor disimpan ke file lokal bernama Anda dapat menyesuaikan tugas impor berdasarkan pesan kesalahan tersebut, lalu mengirim ulang tugas tersebut. |
Membatalkan tugas impor
Anda tidak dapat membatalkan tugas Stream Load secara manual. Tugas akan dibatalkan secara otomatis jika melebihi waktu tenggang atau terjadi kesalahan. Anda dapat menggunakan ErrorURL dari nilai kembali untuk mengunduh pesan kesalahan guna pemecahan masalah.
Contoh lengkap impor data
Contoh ini menggunakan perintah curl untuk membuat tugas impor.
Buat tabel untuk data yang akan diimpor.
Login ke node master kluster StarRocks menggunakan Secure Shell (SSH). Untuk informasi lebih lanjut, lihat Login ke kluster.
Jalankan perintah berikut untuk terhubung ke kluster StarRocks menggunakan klien MySQL.
mysql -h127.0.0.1 -P 9030 -urootJalankan perintah berikut untuk membuat database dan tabel.
CREATE DATABASE IF NOT EXISTS load_test; USE load_test; CREATE TABLE IF NOT EXISTS example_table ( id INT, name VARCHAR(50), age INT ) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( "replication_num" = "1" -- Atur jumlah replika menjadi 1. );Setelah menjalankan perintah tersebut, tekan
Ctrl+Duntuk keluar dari klien MySQL.
Persiapkan data uji.
Persiapkan data CSV
Misalnya, buat file bernama
data.csvdengan konten berikut.id,name,age 1,Alice,25 2,Bob,30 3,Charlie,35Persiapkan data JSON
Misalnya, buat file bernama
json.datadengan konten berikut.{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}Jalankan perintah berikut untuk membuat tugas impor.
Impor data CSV
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label1" \ -H "column_separator: ," \ -T data.csv -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_loadImpor data JSON
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label2" \ -H "format:json" \ -T json.data -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_load
Contoh integrasi kode
Untuk contoh pengembangan Java, lihat stream_load.
Untuk contoh cara mengintegrasikan Stream Load dengan Spark, lihat 01_sparkStreaming2StarRocks.