StarRocks memungkinkan Anda mengimpor file CSV berukuran hingga 10 GB dari mesin lokal. Topik ini menjelaskan prinsip dasar, praktik terbaik, serta contoh penggunaan Stream Load untuk mengimpor data.
Informasi latar belakang
Stream Load adalah metode impor sinkron yang memungkinkan Anda mengimpor file lokal atau aliran data ke StarRocks melalui permintaan HTTP. Dalam mode ini, hasil impor dikembalikan setelah proses selesai. Anda dapat menentukan keberhasilan impor berdasarkan nilai balasan dari permintaan tersebut.
Istilah
coordinator: Node yang menerima data, mendistribusikannya ke node lain, dan mengembalikan hasil setelah impor selesai.
Prinsip dasar
Stream Load memungkinkan Anda mengirimkan pekerjaan impor melalui permintaan HTTP. Jika permintaan dikirim ke node frontend (FE), FE akan meneruskannya ke node backend (BE) dengan pengalihan HTTP. Anda juga dapat mengirimkan permintaan langsung ke node BE. Node BE bertindak sebagai koordinator untuk membagi data berdasarkan skema tabel dan mendistribusikannya ke node BE terkait. Koordinator kemudian mengembalikan hasil pekerjaan impor kepada Anda.
Gambar berikut mengilustrasikan cara kerja Stream Load.
Contoh
Membuat pekerjaan impor
Dalam mode Stream Load, data dikirimkan dan ditransfer menggunakan protokol HTTP. Contoh ini menggunakan perintah curl untuk mengirimkan pekerjaan impor. Anda juga dapat menggunakan klien HTTP lainnya untuk tujuan yang sama.
Sintaksis
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
http://fe_host:http_port/api/{db}/{table}/_stream_loadHTTP mendukung chunked transfer encoding dan non-chunked transfer encoding. Untuk non-chunked transfer encoding, Anda harus menggunakan header Content-Length untuk menentukan panjang konten yang diunggah guna memastikan integritas data.
Disarankan untuk mengatur header Expect ke 100-continue guna mencegah transmisi data yang tidak perlu saat terjadi kesalahan.
Properti header yang didukung dapat dilihat pada deskripsi parameter pekerjaan impor dalam tabel berikut. Parameter dikonfigurasi dalam format -H "key1:value1". Jika ada beberapa parameter, gunakan beberapa -H untuk menunjukkan parameter tersebut. Contoh: -H "key1:value1" -H "key2:value2". Dalam mode Stream Load, semua parameter terkait pekerjaan impor dikonfigurasi di header. Tabel berikut menjelaskan parameter tersebut.
Parameter | Deskripsi | |
Parameter tanda tangan | user:passwd | Dalam Stream Load, pekerjaan impor dibuat menggunakan protokol HTTP. Tanda tangan dihasilkan untuk pekerjaan impor menggunakan otentikasi akses dasar. StarRocks mengotentikasi identitas pengguna dan izin impor berdasarkan tanda tangan. |
Parameter pekerjaan impor | label | Label pekerjaan impor. Data dengan label yang sama tidak dapat diimpor secara berulang. Anda dapat menentukan label untuk pekerjaan impor untuk mencegah data diimpor secara berulang. StarRocks menyimpan label untuk pekerjaan yang telah selesai dalam 30 menit terakhir. |
column_separator | Pemisah kolom file yang ingin Anda impor. Nilai default: \t. Jika karakter non-cetak digunakan sebagai pemisah kolom, pemisah harus dalam format heksadesimal dan dimulai dengan awalan \x. Misalnya, jika pemisah kolom file Hive adalah \x01, konfigurasikan parameter ini dalam format berikut: | |
row_delimiter | Pemisah baris file yang ingin Anda impor. Nilai default: \n. null \n tidak dapat dilewatkan dengan menggunakan perintah curl. Jika Anda menentukan \n sebagai pemisah baris, Shell pertama-tama melewati garis miring mundur (\) dan kemudian n, bukan langsung melewati \n. Anda dapat meloloskan string dengan menggunakan skrip Bash. Jika Anda ingin melewati \n dan \t, Anda dapat memulai string dengan tanda dolar ($) dan tanda kutip tunggal penuh ('), dan mengakhiri string dengan tanda kutip tunggal setengah lebar ('). Contoh: | |
columns | Pemetaan antara kolom dalam file yang ingin Anda impor dan kolom dalam tabel StarRocks. Jika kolom dalam file sumber sama dengan yang ada di tabel StarRocks, Anda tidak perlu mengonfigurasi parameter ini. Sebaliknya, Anda harus mengonfigurasi parameter ini untuk menentukan aturan konversi data. Anda dapat mengonfigurasi parameter ini dengan menggunakan metode berikut: Tentukan nama kolom dalam tabel StarRocks yang sesuai dengan nama kolom dalam file secara berurutan. Atau, tentukan kolom berdasarkan perhitungan.
| |
where | Kondisi filter data. Anda dapat mengonfigurasi parameter ini untuk menyaring data yang tidak diperlukan. Sebagai contoh, jika Anda hanya ingin mengimpor data yang nilainya di kolom k1 adalah 20180601, Anda dapat mengatur parameter ini menjadi | |
max_filter_ratio | Rasio maksimum data yang dapat disaring. Misalnya, data disaring karena tidak sesuai dengan standar tertentu. Nilai default: 0. Nilai valid: 0 hingga 1. null Data yang tidak sesuai dengan standar tidak termasuk data yang disaring oleh kondisi WHERE. | |
partitions | Partisi ke mana data diimpor. Kami merekomendasikan Anda untuk mengonfigurasi parameter ini jika Anda memastikan partisi ke mana data diimpor. Data yang tidak termasuk dalam partisi yang ditentukan akan disaring. Sebagai contoh, jika Anda ingin mengimpor data ke partisi p1 dan p2, Anda dapat mengatur parameter ini menjadi | |
timeout | Periode timeout pekerjaan impor. Nilai default: 600. Nilai valid: 1 hingga 259200. Unit: detik. | |
strict_mode | Menentukan apakah akan mengaktifkan mode ketat untuk pekerjaan impor. Secara default, mode ketat diaktifkan. Untuk menonaktifkan mode ketat, atur parameter ini menjadi | |
timezone | Zona waktu pekerjaan impor. Zona waktu default adalah UTC+8. Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam pekerjaan impor. | |
exec_mem_limit | Ukuran maksimum memori yang tersedia untuk pekerjaan impor. Nilai default: 2. Unit: GB. | |
Contoh
curl --location-trusted -u root -T date -H "label:123" \
http://abc.com:8030/api/test/date/_stream_loadSetelah pekerjaan impor selesai, informasi tentang pekerjaan tersebut dikembalikan dalam format JSON. Contoh hasil balasan:
{
"TxnId": 11672,
"Label": "f6b62abf-4e16-4564-9009-b77823f3c024",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 199563535,
"NumberLoadedRows": 199563535,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 50706674331,
"LoadTimeMs": 801327,
"BeginTxnTimeMs": 103,
"StreamLoadPlanTimeMs": 0,
"ReadDataTimeMs": 760189,
"WriteDataTimeMs": 801023,
"CommitAndPublishTimeMs": 199"
}Parameter | Deskripsi |
TxnId | ID transaksi pekerjaan impor. ID transaksi dapat sepenuhnya dikelola oleh Alibaba Cloud. |
Label | Label pekerjaan impor. Jika Anda telah menentukan label, label tersebut dikembalikan. Jika Anda belum menentukan label, sistem secara otomatis menghasilkan label. |
Status | Status pekerjaan impor. Nilai valid:
|
ExistingJobStatus | Status pekerjaan impor yang sesuai dengan label yang ada. Parameter ini hanya ditampilkan jika nilai parameter Status adalah Label Already Exists. Anda dapat memperoleh status pekerjaan impor yang sesuai dengan label yang ada berdasarkan nilai balasan. Nilai valid:
|
Message | Deskripsi rinci status pekerjaan impor. Jika pekerjaan impor gagal, penyebab kegagalan rinci dikembalikan. |
NumberTotalRows | Jumlah total baris data yang dibaca dari aliran data. |
NumberLoadedRows | Jumlah baris data yang diimpor dalam pekerjaan impor. Parameter ini hanya dikembalikan jika status pekerjaan impor adalah Success. |
NumberFilteredRows | Jumlah baris data yang disaring dalam pekerjaan impor. Baris data yang kualitasnya tidak sesuai dengan standar disaring. |
NumberUnselectedRows | Jumlah baris data yang disaring oleh kondisi WHERE. |
LoadBytes | Ukuran data file sumber. |
LoadTimeMs | Durasi pekerjaan impor. Unit: milidetik. |
ErrorURL | URL entri data yang disaring. Hanya 1.000 entri data pertama yang disimpan. Jika pekerjaan impor gagal, Anda dapat menjalankan perintah berikut untuk mendapatkan data yang disaring. Kemudian, Anda dapat menganalisis data dan membuat penyesuaian. |
Membatalkan pekerjaan impor
Dalam mode Stream Load, Anda dapat menghentikan proses untuk membatalkan pekerjaan impor. Jika pekerjaan impor timeout atau terjadi kesalahan, sistem secara otomatis membatalkan pekerjaan tersebut.
ps -ef | grep stream_loadPraktik terbaik
Skenario
Stream Load cocok untuk skenario di mana file sumber disimpan di memori atau disk lokal. Sebagai metode impor sinkron, Stream Load memungkinkan Anda mendapatkan hasil pekerjaan impor secara langsung.
Ukuran data
Dalam mode Stream Load, node BE mengimpor dan mendistribusikan data. Disarankan untuk menggunakan Stream Load untuk mengimpor data berukuran antara 1 GB hingga 10 GB. Secara default, ukuran maksimum data yang dapat diimpor adalah 10 GB. Untuk mengimpor file yang lebih besar, modifikasi parameter streaming_load_max_mb node BE. Misalnya, jika ukuran file adalah 15 GB (15.360 MB), atur parameter streaming_load_max_mb menjadi nilai lebih besar dari 15 GB.
curl --location-trusted -u 'admin:****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360Periode timeout default untuk pekerjaan impor dalam Stream Load adalah 600 detik. Untuk menyesuaikan periode timeout, ubah nilai parameter timeout node FE di instance StarRocks Anda melalui Konsol EMR.
Contoh lengkap
Data disimpan di direktori /mnt/disk1/customer.tbl klien tertentu. Anda ingin mengimpor data ke tabel customer database stream_load instance StarRocks Anda.
Unduh data: customer.tbl
Jumlah pekerjaan impor dalam mode Stream Load yang dapat diproses secara bersamaan tidak dipengaruhi oleh ukuran instance.
Prosedur:
Modifikasi file konfigurasi BE.conf node BE jika ukuran file melebihi batas atas default. Sebagai contoh, atur parameter streaming_load_max_mb menjadi 15360. Unit: MB.
curl --location-trusted -u 'admin:*****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360Di tab Konfigurasi Instance instance Anda, ubah nilai parameter stream_load_default_timeout_second. Dalam contoh ini, atur nilainya menjadi 3600.
Buat tabel tujuan bernama customer.
CREATE TABLE `customer` ( `c_custkey` bigint(20) NULL COMMENT "", `c_name` varchar(65533) NULL COMMENT "", `c_address` varchar(65533) NULL COMMENT "", `c_nationkey` bigint(20) NULL COMMENT "", `c_phone` varchar(65533) NULL COMMENT "", `c_acctbal` double NULL COMMENT "", `c_mktsegment` varchar(65533) NULL COMMENT "", `c_comment` varchar(65533) NULL COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`c_custkey`) COMMENT "OLAP" DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24 PROPERTIES ( "replication_num" = "1", "in_memory" = "false", "storage_format" = "DEFAULT", "enable_persistent_index" = "false", "compression" = "LZ4" );Buat pekerjaan impor. Jika kumpulan data berisi banyak data, jalankan operasi di latar belakang.
curl --location-trusted -u 'admin:*****' -T /mnt/disk1/customer.tbl -H "label:labelname" -H "column_separator:|" http://fe-c-****-internal.starrocks.aliyuncs.com:8030/api/load_test/customer/_stream_loadInformasi berikut dikembalikan:
{ "TxnId": 575, "Label": "labelname", "Status": "Success", "Message": "OK", "NumberTotalRows": 150000, "NumberLoadedRows": 150000, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 24196144, "LoadTimeMs": 1081, "BeginTxnTimeMs": 104, "StreamLoadPlanTimeMs": 106, "ReadDataTimeMs": 85, "WriteDataTimeMs": 850, "CommitAndPublishTimeMs": 20 }nullJika terjadi kesalahan
"ErrorURL": "http://***:8040/api/_load_error_log?file=error_log_***", Anda dapat menjalankan perintahcurluntuk melihat detailnya.
Contoh kode untuk integrasi
Untuk informasi tentang cara menggunakan Java untuk mengirimkan pekerjaan impor dalam Stream Load, lihat stream_load.
Untuk informasi tentang cara menggunakan Spark untuk mengirimkan pekerjaan impor dalam Stream Load, lihat 01_sparkStreaming2StarRocks.