Broker Load adalah metode impor asinkron di StarRocks yang membaca data dari sistem penyimpanan eksternal melalui proses broker, lalu memproses dan memuat data tersebut menggunakan sumber daya komputasi StarRocks sendiri. Gunakan Broker Load untuk mengimpor data berukuran puluhan hingga ratusan GB per pekerjaan dari Hadoop Distributed File System (HDFS) atau Alibaba Cloud Object Storage Service (OSS). Satu pekerjaan Broker Load dapat menargetkan beberapa tabel tujuan secara atomik—artinya semua tabel berhasil dimuat atau semuanya gagal.
Cara kerja Broker Load
Broker Load memiliki dua mode operasi tergantung pada versi StarRocks yang Anda gunakan:
-
Pemuatan berbasis broker (StarRocks versi sebelum 2.5.8): StarRocks bergantung pada proses broker untuk terhubung ke penyimpanan eksternal. Tentukan nama broker secara eksplisit dengan
WITH BROKER "<broker_name>"dalam pernyataan pemuatan. -
Pemuatan tanpa broker (StarRocks 2.5.8 dan lebih baru): StarRocks terhubung langsung ke penyimpanan eksternal. Abaikan nama broker tetapi tetap gunakan kata kunci
WITH BROKERdalam pernyataan pemuatan.
Saat Anda membuat kluster StarRocks di E-MapReduce (EMR), proses broker secara otomatis dideploy dan dijalankan di setiap node inti.
Sumber dan format yang didukung
| Dimensi | Nilai yang didukung |
|---|---|
| Sumber data | HDFS, OSS |
| Format file | CSV (default), ORC, Parquet (.parquet atau .parq) |
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Kluster StarRocks yang sedang berjalan di EMR dengan minimal satu node inti
-
Kredensial akses untuk HDFS atau OSS
-
Database dan tabel tujuan di StarRocks
Kueri Broker
Untuk memastikan broker sedang berjalan di kluster Anda, jalankan:
SHOW PROC "/brokers"\G
Output akan menampilkan daftar setiap broker beserta alamat IP-nya, port (default: 8000), status alive, dan timestamp:
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
Membuat pekerjaan impor
Semua pekerjaan impor menggunakan pernyataan LOAD LABEL. Sintaksnya sedikit berbeda tergantung versi StarRocks.
Untuk StarRocks versi sebelum 2.5.8
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
Untuk StarRocks versi 2.5.8 dan lebih baru
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_properties
[PROPERTIES (key1=value1, ... )]
Di kluster StarRocks EMR, gunakan broker sebagai nama broker.
Untuk melihat referensi sintaks lengkap, jalankan HELP BROKER LOAD.
Label
Setiap pekerjaan impor memerlukan label unik. Anda dapat menentukan label kustom atau membiarkan sistem menghasilkannya. Label ini memiliki dua fungsi:
-
Memantau status pekerjaan dengan
SHOW LOAD WHERE label = '<label>' -
Mencegah impor duplikat—StarRocks akan menolak pekerjaan baru jika sudah ada pekerjaan dengan status FINISHED yang menggunakan label yang sama
Setelah pekerjaan mencapai status FINISHED, label tersebut kedaluwarsa. Jika pekerjaan dibatalkan (CANCELLED), label yang sama dapat digunakan kembali untuk mengirim ulang pekerjaan tersebut.
Parameter data_desc
Klausa data_desc menjelaskan data sumber untuk satu tabel tujuan. Satu pekerjaan dapat berisi beberapa klausa data_desc yang menargetkan tabel berbeda—StarRocks menjamin atomicity di seluruh tabel dalam satu pekerjaan.
data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY column_separator]
[FORMAT AS file_type]
[(col1, ...)]
[COLUMNS FROM PATH AS (colx, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
| Parameter | Deskripsi |
|---|---|
file_path |
Jalur file spesifik atau pola glob menggunakan wildcard (?, *, [], {}, ^). Misalnya, hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ mengimpor semua file di semua partisi di bawah /tablename. Lihat FileSystem.globStatus untuk sintaks wildcard. |
NEGATIVE |
Menandai baris sumber sebagai penghapusan. Gunakan ini untuk membatalkan impor batch ke kolom agregat tipe SUM. |
PARTITION |
Membatasi impor hanya ke partisi yang ditentukan di tabel tujuan. Baris yang tidak termasuk dalam partisi yang terdaftar dihitung sebagai error. Gunakan predikat WHERE untuk menyaringnya. |
column_separator |
Pemisah kolom di file sumber. Default: \t. Untuk karakter tak terlihat, gunakan format heksadesimal (misalnya, \\x01 untuk default Hive \x01). |
file_type |
Format file sumber: csv (default), orc, atau parquet. |
COLUMNS FROM PATH AS |
Mengekstrak nilai bidang partisi dari jalur file. Misalnya, jalur /path/col_name=col_value/dt=20210101/file1 memungkinkan Anda mengimpor col_value dan 20210101 ke kolom tabel col_name dan dt. |
SET |
Fungsi konversi tipe kolom. Diperlukan ketika tipe kolom sumber dan tujuan berbeda. |
WHERE predicate |
Menyaring baris setelah konversi tipe kolom. Baris yang disaring tidak dihitung dalam perhitungan laju error. Beberapa predikat WHERE untuk tabel yang sama digabungkan dengan AND. |
broker_properties dan properti tingkat pekerjaan
Klausa broker_properties meneruskan kredensial penyimpanan dan pengaturan koneksi. Blok opsional PROPERTIES mengontrol perilaku eksekusi pekerjaan.
broker_properties:
(key2=value2, ...)
Ini merupakan cakupan yang berbeda: broker_properties (di dalam WITH BROKER) mengonfigurasi cara StarRocks terhubung ke sistem penyimpanan. PROPERTIES (blok luar) mengontrol cara pekerjaan dijalankan—timeout, rasio filter, memori, dan mode.
| Parameter | Deskripsi |
|---|---|
timeout |
Timeout pekerjaan dalam detik. Default: 14.400 (4 jam). Jika pekerjaan tidak selesai dalam periode ini, statusnya berubah menjadi CANCELLED. Perkirakan timeout minimum dengan rumus: (Ukuran total file dalam MB × Jumlah tabel sumber dan rollup) / (30 × konkurensi). Misalnya, file 1 GB dengan satu tabel sumber, dua tabel rollup, dan konkurensi 3 memerlukan minimal (1 × 1.024 × 3) / (10 × 3) = 102 detik. Tetapkan timeout hanya jika Anda memperkirakan pekerjaan akan melebihi nilai default. |
max_filter_ratio |
Laju error maksimum yang dapat diterima, dari 0 hingga 1. Default: 0. Jika laju error melebihi nilai ini, pekerjaan gagal. Tetapkan nilai lebih dari 0 untuk mengizinkan beberapa baris error dilewati. Hitung rasionya sebagai: max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL). |
load_mem_limit |
Batas memori untuk pekerjaan dalam byte. Default: 0 (tanpa batas). |
strict_mode |
Ketika diatur ke true, baris yang nilai sumber non-null-nya dikonversi menjadi null dianggap sebagai error dan disaring. Default: dinonaktifkan. Aktifkan dengan PROPERTIES ("strict_mode" = "true"). Mode ketat tidak berlaku untuk kolom yang dihasilkan fungsi atau nilai yang valid tetapi berada di luar rentang kolom tujuan. |
Memuat data dari OSS
Contoh berikut memuat file lineitem TPC-H dari OSS ke tabel StarRocks.
Untuk StarRocks versi sebelum 2.5.8
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
Untuk StarRocks versi 2.5.8 dan lebih baru
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
"fs.oss.accessKeyId" = "xxx",
"fs.oss.accessKeySecret" = "xxx",
"fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);
Memuat data dari HDFS
Otentikasi
HDFS mendukung dua mode otentikasi.
Otentikasi Sederhana — identitas pengguna ditentukan oleh OS Klien yang terhubung:
| Parameter | Deskripsi |
|---|---|
hadoop.security.authentication |
Atur ke simple. |
username |
Username HDFS. |
password |
Password HDFS. |
Otentikasi Kerberos—identitas pengguna ditentukan oleh kredensial Kerberos:
| Parameter | Deskripsi |
|---|---|
hadoop.security.authentication |
Atur ke kerberos. |
kerberos_principal |
Principal Kerberos. |
kerberos_keytab |
Jalur ke file keytab Kerberos. File harus berada di server yang sama dengan proses broker. |
kerberos_keytab_content |
Konten file keytab dalam format Base64-encoded. Konfigurasikan salah satu parameter ini atau kerberos_keytab, jangan keduanya. |
Konfigurasi ketersediaan tinggi
Untuk kluster HDFS dengan high availability (HA) NameNode, konfigurasikan parameter berikut agar StarRocks dapat mendeteksi failover NameNode aktif secara otomatis:
| Parameter | Deskripsi |
|---|---|
dfs.nameservices |
Nama kustom untuk layanan HDFS (misalnya, my-ha). |
dfs.ha.namenodes.xxx |
Daftar nama NameNode yang dipisahkan koma. Ganti xxx dengan nilai dfs.nameservices. |
dfs.namenode.rpc-address.xxx.nn |
Alamat Remote Procedure Call (RPC) setiap NameNode (Hostname:Port). Ganti nn dengan masing-masing nama NameNode. |
dfs.client.failover.proxy.provider.xxx |
Penyedia proxy failover. Default: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider. |
Contoh broker_properties untuk kluster HDFS HA dengan otentikasi simple:
(
"username" = "user",
"password" = "passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
Jika konfigurasi HDFS kluster disimpan dalam hdfs-site.xml, cukup tentukan jalur file dan parameter otentikasi saja.
Contoh pemuatan HDFS
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1, tmp_c2)
SET
(
id = tmp_c2,
name = tmp_c1
),
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
WHERE col1 > 1
)
WITH BROKER 'broker1'
(
"username" = "hdfs_username",
"password" = "hdfs_password"
)
PROPERTIES
(
"timeout" = "3600"
);
Memeriksa status pekerjaan
Broker Load bersifat asinkron. Setelah mengirim pekerjaan, gunakan SHOW LOAD untuk melacak progresnya:
SHOW LOAD WHERE label = 'label1'\G
Untuk melihat sintaks lengkap SHOW LOAD, jalankan HELP SHOW LOAD.
SHOW LOAD hanya berlaku untuk metode impor asinkron. Tidak berlaku untuk metode sinkron seperti Stream Load.
Outputnya tampak seperti:
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
| Bidang | Deskripsi |
|---|---|
JobId |
ID unik yang dihasilkan sistem untuk pekerjaan. Tidak pernah digunakan ulang. |
Label |
Label yang Anda tentukan atau yang dihasilkan otomatis. |
State |
Status pekerjaan saat ini: PENDING (menunggu), LOADING (berjalan), CANCELLED (gagal), atau FINISHED (berhasil). |
Progress |
Broker Load tidak memiliki fase ETL, sehingga ETL selalu menunjukkan N/A. Progres LOADING adalah (jumlah tabel yang diimpor / total tabel sumber) × 100%. Nilainya mencapai 99% ketika semua tabel telah diimpor tetapi pekerjaan masih dalam proses finalisasi — 100% hanya setelah pekerjaan benar-benar selesai. Progres tidak linear; persentase yang stabil tidak berarti pekerjaan telah berhenti. |
Type |
Selalu BROKER untuk pekerjaan Broker Load. |
EtlInfo |
Tiga penghitung: unselected.rows (baris yang disaring oleh WHERE), dpp.norm.ALL (baris yang berhasil dimuat), dpp.abnorm.ALL (baris dengan error). Jumlah ketiganya sama dengan total baris di file sumber. |
TaskInfo |
Parameter pekerjaan yang Anda konfigurasi: kluster, timeout, dan max_filter_ratio. |
ErrorMsg |
Detail error ketika pekerjaan berstatus CANCELLED. N/A ketika FINISHED. Bidang type dapat berupa: USER-CANCEL, ETL-RUN-FAIL, ETL-QUALITY-UNSATISFIED, LOAD-RUN-FAIL, TIMEOUT, atau UNKNOWN. |
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime |
Timestamp untuk setiap fase. Broker Load tidak memiliki fase ETL, sehingga EtlStartTime, EtlFinishTime, dan LoadStartTime memiliki nilai yang sama. Jika hanya CreateTime yang terisi dan LoadStartTime tetap N/A dalam waktu lama, berarti terlalu banyak pekerjaan dalam antrean — hentikan pengiriman pekerjaan baru hingga antrean kosong. |
URL |
Tautan ke contoh baris yang error. N/A jika tidak ada error. |
JobDetails |
Detail tentang backend, jumlah baris yang dipindai (diperbarui setiap 5 detik), jumlah tugas, jumlah file, dan ukuran file. |
Membatalkan pekerjaan impor
Batalkan pekerjaan yang berstatus PENDING atau LOADING:
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];
Untuk melihat sintaks lengkap, jalankan HELP CANCEL LOAD.
Contoh end-to-end: muat dari OSS
Contoh ini memuat data lineitem TPC-H dari OSS ke StarRocks.
Langkah 1: Buat tabel tujuan.
CREATE TABLE lineitem (
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber int,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate date,
l_commitdate date,
l_receiptdate date,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
"replication_num" = "1"
);
Langkah 2: Kirim pekerjaan impor.
Untuk StarRocks versi sebelum 2.5.8
Untuk StarRocks versi sebelum 2.5.8, tambahkan WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") setelah tanda kurung tutup.
Untuk StarRocks versi 2.5.8 dan lebih baru
Untuk StarRocks 2.5.8 dan lebih baru:
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
Langkah 3: Pantau pekerjaan.
SHOW LOAD WHERE label = 'lineitem'\G
Saat pekerjaan berhasil, output menunjukkan State: FINISHED dan Progress: ETL:100%; LOAD:100%.
Langkah 4: Verifikasi data.
-- Periksa jumlah total baris
SELECT COUNT(*) FROM lineitem;
-- Pratinjau dua baris pertama
SELECT * FROM lineitem LIMIT 2;
Konkurensi
Pekerjaan Broker Load dibagi menjadi tugas, lalu tugas tersebut dibagi lagi menjadi instans yang didistribusikan secara paralel di seluruh backend.
Pemisahan tugas: Satu pekerjaan menghasilkan satu tugas per klausa data_desc yang mengarah ke alamat sumber berbeda atau kumpulan partisi berbeda.
Pemisahan instans: Setiap tugas dibagi menjadi instans berdasarkan tiga parameter frontend:
| Parameter | Default | Deskripsi |
|---|---|---|
min_bytes_per_broker_scanner |
64 MB | Volume data minimum per instans. |
max_broker_concurrency |
100 | Instans paralel maksimum per tugas. |
load_parallel_instance_num |
1 | Instans paralel per backend. |
Total instans = min(ukuran_file / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × jumlah backend)
Dalam praktiknya, sebagian besar pekerjaan memiliki satu klausa data_desc sehingga hanya menghasilkan satu tugas. Tugas tersebut dibagi menjadi sebanyak mungkin instans sesuai jumlah backend, dengan setiap instans berjalan di backend berbeda.
Pemecahan masalah
Pekerjaan masuk ke status CANCELLED
Periksa bidang ErrorMsg dalam output SHOW LOAD. Tabel berikut memetakan setiap jenis error ke penyebab dan solusinya:
| Tipe ErrorMsg | Penyebab | Perbaikan |
|---|---|---|
ETL-QUALITY-UNSATISFIED |
Laju error melebihi max_filter_ratio. |
Tingkatkan max_filter_ratio dalam PROPERTIES untuk mengizinkan lebih banyak baris error dilewati, atau perbaiki kualitas data sumber. |
TIMEOUT |
Pekerjaan melebihi periode timeout. | Hitung ulang timeout minimum menggunakan rumus (Ukuran total file dalam MB × jumlah tabel sumber dan rollup) / (30 × konkurensi) dan tetapkan nilai yang lebih besar dalam PROPERTIES ("timeout" = "..."). |
LOAD-RUN-FAIL |
Backend gagal selama fase LOADING. | Periksa bidang URL untuk contoh baris error. Periksa log backend untuk masalah perangkat keras atau jaringan. |
ETL-RUN-FAIL |
Kegagalan dalam fase ETL. | Periksa bidang URL untuk detail error. |
USER-CANCEL |
Pekerjaan dibatalkan secara manual. | Kirim ulang dengan label yang sama (label CANCELLED dapat digunakan ulang). |
UNKNOWN |
Terjadi error tak terduga. | Periksa log backend dan frontend untuk detailnya. |
Progres LOADING mandek
Progres yang ditampilkan dalam SHOW LOAD tidak linear—hanya diperbarui ketika tabel lengkap berhasil diimpor. Persentase yang stabil tidak berarti pekerjaan telah berhenti. Tunggu hingga bidang LoadFinishTime terisi sebelum menyimpulkan pekerjaan macet.
Jika LoadStartTime tetap N/A dalam waktu lama setelah pekerjaan dibuat, berarti banyak pekerjaan dalam antrean. Hentikan pengiriman pekerjaan baru hingga antrean kosong.
Langkah selanjutnya
-
Tinjau dokumentasi StarRocks untuk opsi lanjutan
LOAD LABELdan penggunaanHELP BROKER LOAD -
Pantau pekerjaan yang sedang berjalan dengan
SHOW LOADdan tinjau log backend untuk penyetelan performa -
Sesuaikan
min_bytes_per_broker_scanner,max_broker_concurrency, danload_parallel_instance_numdi frontend untuk menyetel konkurensi impor sesuai kluster Anda