All Products
Search
Document Center

E-MapReduce:Broker Load

Last Updated:Mar 27, 2026

Broker Load adalah metode impor asinkron di StarRocks yang membaca data dari sistem penyimpanan eksternal melalui proses broker, lalu memproses dan memuat data tersebut menggunakan sumber daya komputasi StarRocks sendiri. Gunakan Broker Load untuk mengimpor data berukuran puluhan hingga ratusan GB per pekerjaan dari Hadoop Distributed File System (HDFS) atau Alibaba Cloud Object Storage Service (OSS). Satu pekerjaan Broker Load dapat menargetkan beberapa tabel tujuan secara atomik—artinya semua tabel berhasil dimuat atau semuanya gagal.

Cara kerja Broker Load

Broker Load memiliki dua mode operasi tergantung pada versi StarRocks yang Anda gunakan:

  • Pemuatan berbasis broker (StarRocks versi sebelum 2.5.8): StarRocks bergantung pada proses broker untuk terhubung ke penyimpanan eksternal. Tentukan nama broker secara eksplisit dengan WITH BROKER "<broker_name>" dalam pernyataan pemuatan.

  • Pemuatan tanpa broker (StarRocks 2.5.8 dan lebih baru): StarRocks terhubung langsung ke penyimpanan eksternal. Abaikan nama broker tetapi tetap gunakan kata kunci WITH BROKER dalam pernyataan pemuatan.

Saat Anda membuat kluster StarRocks di E-MapReduce (EMR), proses broker secara otomatis dideploy dan dijalankan di setiap node inti.

Sumber dan format yang didukung

Dimensi Nilai yang didukung
Sumber data HDFS, OSS
Format file CSV (default), ORC, Parquet (.parquet atau .parq)

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Kluster StarRocks yang sedang berjalan di EMR dengan minimal satu node inti

  • Kredensial akses untuk HDFS atau OSS

  • Database dan tabel tujuan di StarRocks

Kueri Broker

Untuk memastikan broker sedang berjalan di kluster Anda, jalankan:

SHOW PROC "/brokers"\G

Output akan menampilkan daftar setiap broker beserta alamat IP-nya, port (default: 8000), status alive, dan timestamp:

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:

Membuat pekerjaan impor

Semua pekerjaan impor menggunakan pernyataan LOAD LABEL. Sintaksnya sedikit berbeda tergantung versi StarRocks.

Untuk StarRocks versi sebelum 2.5.8

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_name broker_properties
    [PROPERTIES (key1=value1, ... )]

Untuk StarRocks versi 2.5.8 dan lebih baru

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_properties
    [PROPERTIES (key1=value1, ... )]
Di kluster StarRocks EMR, gunakan broker sebagai nama broker.

Untuk melihat referensi sintaks lengkap, jalankan HELP BROKER LOAD.

Label

Setiap pekerjaan impor memerlukan label unik. Anda dapat menentukan label kustom atau membiarkan sistem menghasilkannya. Label ini memiliki dua fungsi:

  • Memantau status pekerjaan dengan SHOW LOAD WHERE label = '<label>'

  • Mencegah impor duplikat—StarRocks akan menolak pekerjaan baru jika sudah ada pekerjaan dengan status FINISHED yang menggunakan label yang sama

Setelah pekerjaan mencapai status FINISHED, label tersebut kedaluwarsa. Jika pekerjaan dibatalkan (CANCELLED), label yang sama dapat digunakan kembali untuk mengirim ulang pekerjaan tersebut.

Parameter data_desc

Klausa data_desc menjelaskan data sumber untuk satu tabel tujuan. Satu pekerjaan dapat berisi beberapa klausa data_desc yang menargetkan tabel berbeda—StarRocks menjamin atomicity di seluruh tabel dalam satu pekerjaan.

data_desc:
    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY column_separator]
    [FORMAT AS file_type]
    [(col1, ...)]
    [COLUMNS FROM PATH AS (colx, ...)]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
Parameter Deskripsi
file_path Jalur file spesifik atau pola glob menggunakan wildcard (?, *, [], {}, ^). Misalnya, hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ mengimpor semua file di semua partisi di bawah /tablename. Lihat FileSystem.globStatus untuk sintaks wildcard.
NEGATIVE Menandai baris sumber sebagai penghapusan. Gunakan ini untuk membatalkan impor batch ke kolom agregat tipe SUM.
PARTITION Membatasi impor hanya ke partisi yang ditentukan di tabel tujuan. Baris yang tidak termasuk dalam partisi yang terdaftar dihitung sebagai error. Gunakan predikat WHERE untuk menyaringnya.
column_separator Pemisah kolom di file sumber. Default: \t. Untuk karakter tak terlihat, gunakan format heksadesimal (misalnya, \\x01 untuk default Hive \x01).
file_type Format file sumber: csv (default), orc, atau parquet.
COLUMNS FROM PATH AS Mengekstrak nilai bidang partisi dari jalur file. Misalnya, jalur /path/col_name=col_value/dt=20210101/file1 memungkinkan Anda mengimpor col_value dan 20210101 ke kolom tabel col_name dan dt.
SET Fungsi konversi tipe kolom. Diperlukan ketika tipe kolom sumber dan tujuan berbeda.
WHERE predicate Menyaring baris setelah konversi tipe kolom. Baris yang disaring tidak dihitung dalam perhitungan laju error. Beberapa predikat WHERE untuk tabel yang sama digabungkan dengan AND.

broker_properties dan properti tingkat pekerjaan

Klausa broker_properties meneruskan kredensial penyimpanan dan pengaturan koneksi. Blok opsional PROPERTIES mengontrol perilaku eksekusi pekerjaan.

broker_properties:
    (key2=value2, ...)
Penting

Ini merupakan cakupan yang berbeda: broker_properties (di dalam WITH BROKER) mengonfigurasi cara StarRocks terhubung ke sistem penyimpanan. PROPERTIES (blok luar) mengontrol cara pekerjaan dijalankan—timeout, rasio filter, memori, dan mode.

Parameter Deskripsi
timeout Timeout pekerjaan dalam detik. Default: 14.400 (4 jam). Jika pekerjaan tidak selesai dalam periode ini, statusnya berubah menjadi CANCELLED. Perkirakan timeout minimum dengan rumus: (Ukuran total file dalam MB × Jumlah tabel sumber dan rollup) / (30 × konkurensi). Misalnya, file 1 GB dengan satu tabel sumber, dua tabel rollup, dan konkurensi 3 memerlukan minimal (1 × 1.024 × 3) / (10 × 3) = 102 detik. Tetapkan timeout hanya jika Anda memperkirakan pekerjaan akan melebihi nilai default.
max_filter_ratio Laju error maksimum yang dapat diterima, dari 0 hingga 1. Default: 0. Jika laju error melebihi nilai ini, pekerjaan gagal. Tetapkan nilai lebih dari 0 untuk mengizinkan beberapa baris error dilewati. Hitung rasionya sebagai: max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL).
load_mem_limit Batas memori untuk pekerjaan dalam byte. Default: 0 (tanpa batas).
strict_mode Ketika diatur ke true, baris yang nilai sumber non-null-nya dikonversi menjadi null dianggap sebagai error dan disaring. Default: dinonaktifkan. Aktifkan dengan PROPERTIES ("strict_mode" = "true"). Mode ketat tidak berlaku untuk kolom yang dihasilkan fungsi atau nilai yang valid tetapi berada di luar rentang kolom tujuan.

Memuat data dari OSS

Contoh berikut memuat file lineitem TPC-H dari OSS ke tabel StarRocks.

Untuk StarRocks versi sebelum 2.5.8

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

Untuk StarRocks versi 2.5.8 dan lebih baru

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
    "fs.oss.accessKeyId" = "xxx",
    "fs.oss.accessKeySecret" = "xxx",
    "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);

Memuat data dari HDFS

Otentikasi

HDFS mendukung dua mode otentikasi.

Otentikasi Sederhana — identitas pengguna ditentukan oleh OS Klien yang terhubung:

Parameter Deskripsi
hadoop.security.authentication Atur ke simple.
username Username HDFS.
password Password HDFS.

Otentikasi Kerberos—identitas pengguna ditentukan oleh kredensial Kerberos:

Parameter Deskripsi
hadoop.security.authentication Atur ke kerberos.
kerberos_principal Principal Kerberos.
kerberos_keytab Jalur ke file keytab Kerberos. File harus berada di server yang sama dengan proses broker.
kerberos_keytab_content Konten file keytab dalam format Base64-encoded. Konfigurasikan salah satu parameter ini atau kerberos_keytab, jangan keduanya.

Konfigurasi ketersediaan tinggi

Untuk kluster HDFS dengan high availability (HA) NameNode, konfigurasikan parameter berikut agar StarRocks dapat mendeteksi failover NameNode aktif secara otomatis:

Parameter Deskripsi
dfs.nameservices Nama kustom untuk layanan HDFS (misalnya, my-ha).
dfs.ha.namenodes.xxx Daftar nama NameNode yang dipisahkan koma. Ganti xxx dengan nilai dfs.nameservices.
dfs.namenode.rpc-address.xxx.nn Alamat Remote Procedure Call (RPC) setiap NameNode (Hostname:Port). Ganti nn dengan masing-masing nama NameNode.
dfs.client.failover.proxy.provider.xxx Penyedia proxy failover. Default: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.

Contoh broker_properties untuk kluster HDFS HA dengan otentikasi simple:

(
    "username" = "user",
    "password" = "passwd",
    "dfs.nameservices" = "my-ha",
    "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
    "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
    "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
    "dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

Jika konfigurasi HDFS kluster disimpan dalam hdfs-site.xml, cukup tentukan jalur file dan parameter otentikasi saja.

Contoh pemuatan HDFS

LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1, tmp_c2)
    SET
    (
        id = tmp_c2,
        name = tmp_c1
    ),

    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
    INTO TABLE tbl2
    COLUMNS TERMINATED BY ","
    (col1, col2)
    WHERE col1 > 1
)
WITH BROKER 'broker1'
(
    "username" = "hdfs_username",
    "password" = "hdfs_password"
)
PROPERTIES
(
    "timeout" = "3600"
);

Memeriksa status pekerjaan

Broker Load bersifat asinkron. Setelah mengirim pekerjaan, gunakan SHOW LOAD untuk melacak progresnya:

SHOW LOAD WHERE label = 'label1'\G

Untuk melihat sintaks lengkap SHOW LOAD, jalankan HELP SHOW LOAD.

SHOW LOAD hanya berlaku untuk metode impor asinkron. Tidak berlaku untuk metode sinkron seperti Stream Load.

Outputnya tampak seperti:

*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
Bidang Deskripsi
JobId ID unik yang dihasilkan sistem untuk pekerjaan. Tidak pernah digunakan ulang.
Label Label yang Anda tentukan atau yang dihasilkan otomatis.
State Status pekerjaan saat ini: PENDING (menunggu), LOADING (berjalan), CANCELLED (gagal), atau FINISHED (berhasil).
Progress Broker Load tidak memiliki fase ETL, sehingga ETL selalu menunjukkan N/A. Progres LOADING adalah (jumlah tabel yang diimpor / total tabel sumber) × 100%. Nilainya mencapai 99% ketika semua tabel telah diimpor tetapi pekerjaan masih dalam proses finalisasi — 100% hanya setelah pekerjaan benar-benar selesai. Progres tidak linear; persentase yang stabil tidak berarti pekerjaan telah berhenti.
Type Selalu BROKER untuk pekerjaan Broker Load.
EtlInfo Tiga penghitung: unselected.rows (baris yang disaring oleh WHERE), dpp.norm.ALL (baris yang berhasil dimuat), dpp.abnorm.ALL (baris dengan error). Jumlah ketiganya sama dengan total baris di file sumber.
TaskInfo Parameter pekerjaan yang Anda konfigurasi: kluster, timeout, dan max_filter_ratio.
ErrorMsg Detail error ketika pekerjaan berstatus CANCELLED. N/A ketika FINISHED. Bidang type dapat berupa: USER-CANCEL, ETL-RUN-FAIL, ETL-QUALITY-UNSATISFIED, LOAD-RUN-FAIL, TIMEOUT, atau UNKNOWN.
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime Timestamp untuk setiap fase. Broker Load tidak memiliki fase ETL, sehingga EtlStartTime, EtlFinishTime, dan LoadStartTime memiliki nilai yang sama. Jika hanya CreateTime yang terisi dan LoadStartTime tetap N/A dalam waktu lama, berarti terlalu banyak pekerjaan dalam antrean — hentikan pengiriman pekerjaan baru hingga antrean kosong.
URL Tautan ke contoh baris yang error. N/A jika tidak ada error.
JobDetails Detail tentang backend, jumlah baris yang dipindai (diperbarui setiap 5 detik), jumlah tugas, jumlah file, dan ukuran file.

Membatalkan pekerjaan impor

Batalkan pekerjaan yang berstatus PENDING atau LOADING:

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];

Untuk melihat sintaks lengkap, jalankan HELP CANCEL LOAD.

Contoh end-to-end: muat dari OSS

Contoh ini memuat data lineitem TPC-H dari OSS ke StarRocks.

Langkah 1: Buat tabel tujuan.

CREATE TABLE lineitem (
  l_orderkey      bigint,
  l_partkey       bigint,
  l_suppkey       bigint,
  l_linenumber    int,
  l_quantity      double,
  l_extendedprice double,
  l_discount      double,
  l_tax           double,
  l_returnflag    string,
  l_linestatus    string,
  l_shipdate      date,
  l_commitdate    date,
  l_receiptdate   date,
  l_shipinstruct  string,
  l_shipmode      string,
  l_comment       string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
  "replication_num" = "1"
);

Langkah 2: Kirim pekerjaan impor.

Untuk StarRocks versi sebelum 2.5.8

Untuk StarRocks versi sebelum 2.5.8, tambahkan WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") setelah tanda kurung tutup.

Untuk StarRocks versi 2.5.8 dan lebih baru

Untuk StarRocks 2.5.8 dan lebih baru:

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

Langkah 3: Pantau pekerjaan.

SHOW LOAD WHERE label = 'lineitem'\G

Saat pekerjaan berhasil, output menunjukkan State: FINISHED dan Progress: ETL:100%; LOAD:100%.

Langkah 4: Verifikasi data.

-- Periksa jumlah total baris
SELECT COUNT(*) FROM lineitem;

-- Pratinjau dua baris pertama
SELECT * FROM lineitem LIMIT 2;

Konkurensi

Pekerjaan Broker Load dibagi menjadi tugas, lalu tugas tersebut dibagi lagi menjadi instans yang didistribusikan secara paralel di seluruh backend.

Pemisahan tugas: Satu pekerjaan menghasilkan satu tugas per klausa data_desc yang mengarah ke alamat sumber berbeda atau kumpulan partisi berbeda.

Pemisahan instans: Setiap tugas dibagi menjadi instans berdasarkan tiga parameter frontend:

Parameter Default Deskripsi
min_bytes_per_broker_scanner 64 MB Volume data minimum per instans.
max_broker_concurrency 100 Instans paralel maksimum per tugas.
load_parallel_instance_num 1 Instans paralel per backend.

Total instans = min(ukuran_file / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × jumlah backend)

Dalam praktiknya, sebagian besar pekerjaan memiliki satu klausa data_desc sehingga hanya menghasilkan satu tugas. Tugas tersebut dibagi menjadi sebanyak mungkin instans sesuai jumlah backend, dengan setiap instans berjalan di backend berbeda.

Pemecahan masalah

Pekerjaan masuk ke status CANCELLED

Periksa bidang ErrorMsg dalam output SHOW LOAD. Tabel berikut memetakan setiap jenis error ke penyebab dan solusinya:

Tipe ErrorMsg Penyebab Perbaikan
ETL-QUALITY-UNSATISFIED Laju error melebihi max_filter_ratio. Tingkatkan max_filter_ratio dalam PROPERTIES untuk mengizinkan lebih banyak baris error dilewati, atau perbaiki kualitas data sumber.
TIMEOUT Pekerjaan melebihi periode timeout. Hitung ulang timeout minimum menggunakan rumus (Ukuran total file dalam MB × jumlah tabel sumber dan rollup) / (30 × konkurensi) dan tetapkan nilai yang lebih besar dalam PROPERTIES ("timeout" = "...").
LOAD-RUN-FAIL Backend gagal selama fase LOADING. Periksa bidang URL untuk contoh baris error. Periksa log backend untuk masalah perangkat keras atau jaringan.
ETL-RUN-FAIL Kegagalan dalam fase ETL. Periksa bidang URL untuk detail error.
USER-CANCEL Pekerjaan dibatalkan secara manual. Kirim ulang dengan label yang sama (label CANCELLED dapat digunakan ulang).
UNKNOWN Terjadi error tak terduga. Periksa log backend dan frontend untuk detailnya.

Progres LOADING mandek

Progres yang ditampilkan dalam SHOW LOAD tidak linear—hanya diperbarui ketika tabel lengkap berhasil diimpor. Persentase yang stabil tidak berarti pekerjaan telah berhenti. Tunggu hingga bidang LoadFinishTime terisi sebelum menyimpulkan pekerjaan macet.

Jika LoadStartTime tetap N/A dalam waktu lama setelah pekerjaan dibuat, berarti banyak pekerjaan dalam antrean. Hentikan pengiriman pekerjaan baru hingga antrean kosong.

Langkah selanjutnya

  • Tinjau dokumentasi StarRocks untuk opsi lanjutan LOAD LABEL dan penggunaan HELP BROKER LOAD

  • Pantau pekerjaan yang sedang berjalan dengan SHOW LOAD dan tinjau log backend untuk penyetelan performa

  • Sesuaikan min_bytes_per_broker_scanner, max_broker_concurrency, dan load_parallel_instance_num di frontend untuk menyetel konkurensi impor sesuai kluster Anda