All Products
Search
Document Center

ApsaraDB for SelectDB:Gunakan Broker Load untuk mengimpor data

Last Updated:Apr 29, 2026

Broker Load adalah metode impor massal asinkron yang memuat data dari sistem penyimpanan terdistribusi—Hadoop Distributed File System (HDFS), Object Storage Service (OSS), dan Amazon Simple Storage Service (Amazon S3)—langsung ke ApsaraDB for SelectDB. Satu pekerjaan impor dapat menangani ratusan GB tanpa memblokir kluster Anda.

Cara kerja

  1. Kirim pernyataan LOAD LABEL melalui protokol MySQL. Pekerjaan langsung diterima dan dijalankan secara asinkron.

  2. Broker terhubung ke sistem penyimpanan remote dan membaca file yang ditentukan.

  3. Data dimuat ke tabel target. Jika Anda memuat ke beberapa tabel, semua penulisan dalam pekerjaan tersebut bersifat atomik.

  4. Label mencatat status penyelesaian pekerjaan, mencegah impor duplikat jika Anda mengirim ulang permintaan yang sama.

Kinerja

Volume data

Waktu penyelesaian yang diharapkan

~100 MB

~10 detik

~100 GB

~10 menit

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Instans ApsaraDB for SelectDB.

  • Akses jaringan ke sistem penyimpanan sumber (HDFS, OSS, atau Amazon S3). Jika sistem penyimpanan remote berada di lingkungan jaringan publik, instans ApsaraDB for SelectDB Anda harus memiliki akses jaringan publik. Lihat Atasi masalah jaringan dengan sumber data.

  • Tabel target yang telah dibuat di instans SelectDB Anda.

Kirim pekerjaan Broker Load

Sintaks

LOAD LABEL load_label
(
    data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

Parameter

Parameter

Deskripsi

load_label

Identifikasi unik untuk pekerjaan, dalam format [database.]label_name. Jika pekerjaan sebelumnya dengan label ini berada dalam status CANCELLED, label tersebut dapat digunakan kembali. Gunakan label yang sama untuk batch data yang sama guna menerapkan semantik at-most-once — pengiriman ulang dengan label yang sama hanya diterima satu kali.

data_desc

Menjelaskan file yang akan diimpor. Lihat Parameter deskripsi data.

WITH broker_type

Jenis broker. Nilai yang valid: HDFS dan S3. Pekerjaan yang menggunakan broker S3 juga disebut sebagai pekerjaan OSS Load. Untuk instruksi khusus OSS, lihat Impor data menggunakan OSS.

broker_properties

Parameter koneksi untuk sistem penyimpanan remote, dalam format ("key1" = "val1", "key2" = "val2", ...).

load_properties

Parameter perilaku impor. Lihat Properti impor.

Parameter deskripsi data

[MERGE|APPEND|DELETE]
DATA INFILE
(
    "file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

Parameter

Deskripsi

MERGE|APPEND|DELETE

Mode merge. Default: APPEND (penambahan standar). MERGE dan DELETE hanya berlaku untuk tabel yang menggunakan model Unique Key. Dengan MERGE, gunakan DELETE ON untuk menentukan kolom Delete Flag. Dengan DELETE, semua baris yang diimpor dihapus dari tabel.

DATA INFILE

Jalur ke file yang akan diimpor. Mendukung beberapa jalur dan wildcard. Setiap jalur harus mengarah ke file aktual, bukan direktori.

NEGATIVE

Membalik nilai integer yang diagregasi oleh fungsi SUM. Gunakan ini untuk mengimbangi data yang sebelumnya diimpor namun mengandung kesalahan. Hanya berlaku untuk kolom INTEGER yang diagregasi dengan SUM.

PARTITION(p1, p2, ...)

Membatasi impor hanya ke partisi tertentu. Data di luar partisi yang ditentukan akan dikecualikan.

COLUMNS TERMINATED BY

Pemisah kolom. Hanya berlaku untuk file CSV. Hanya pemisah satu byte.

FORMAT AS

Format file. Default: CSV. Nilai yang valid: CSV, PARQUET, ORC.

column_list

Urutan kolom dalam file sumber.

COLUMNS FROM PATH AS

Ekstraksi kolom dari jalur file.

PRECEDING FILTER predicate

Filter awal yang diterapkan setelah kolom dirakit dari column_list dan COLUMNS FROM PATH AS, sebelum pemetaan kolom apa pun.

SET (column_mapping)

Ekspresi transformasi kolom yang diterapkan selama impor.

WHERE predicate

Memfilter baris setelah pemetaan kolom. Hanya baris yang sesuai yang diimpor.

DELETE ON expr

Diperlukan saat menggunakan mode MERGE. Menentukan ekspresi yang menandai baris untuk dihapus. Hanya berlaku untuk tabel model Unique Key.

ORDER BY source_sequence

Menentukan kolom urutan untuk menjaga urutan data. Hanya berlaku untuk tabel model Unique Key.

PROPERTIES ("key1"="value1", ...)

Parameter khusus format, seperti json_root, jsonpaths, dan fuzzy_parse untuk file JSON.

Properti impor

Parameter

Default

Deskripsi

timeout

14400 (4 jam)

Timeout dalam detik. Penghitung dimulai saat pekerjaan dikirim. Jika pekerjaan tidak selesai dalam periode ini, pekerjaan gagal.

max_filter_ratio

0

Proporsi maksimum baris error yang dapat ditoleransi, dari 0 hingga 1. Nilai default 0 berarti pekerjaan gagal jika ada baris yang mengalami kesalahan format. Atur nilai ini lebih besar dari 0 agar SelectDB dapat melewati baris dengan format salah.

exec_mem_limit

2147483648 (2 GB)

Memori maksimum yang dialokasikan untuk pekerjaan, dalam byte.

strict_mode

false

Jika true, mengaktifkan mode ketat, yang memengaruhi pemetaan kolom, konversi tipe, dan perilaku filter.

timezone

Asia/Shanghai

Zona waktu untuk fungsi yang sensitif terhadap zona waktu: strftime, alignment_timestamp, dan from_unixtime.

load_parallelism

1

Tingkat paralelisme (DOP) untuk impor. Mengatur nilai ini lebih besar dari 1 akan menjalankan beberapa rencana eksekusi secara konkuren, sehingga mempercepat impor data besar.

send_batch_parallelism

DOP untuk mengirim batch data. Dibatasi oleh max_send_batch_parallelism_per_job dalam konfigurasi backend (BE) kluster komputasi.

load_to_single_tablet

false

Jika true, mengimpor data ke satu tablet per partisi. Hanya berlaku untuk tabel model Duplicate Key dengan bucketing acak.

Contoh

Semua contoh berikut menggunakan struktur tabel yang sama. Jalankan pernyataan berikut terlebih dahulu untuk membuat tabel dan file data sampel.

Buat tabel

CREATE TABLE test_table
(
    id      int,
    name    varchar(50),
    age     int,
    address varchar(50)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");

CREATE TABLE test_table2
(
    id      int,
    name    varchar(50),
    age     int,
    address varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");

File data sampel

file1.txt (dipisahkan koma):

1,tomori,32,shanghai
2,anon,22,beijing
3,taki,23,shenzhen
4,rana,45,hangzhou
5,soyo,14,shanghai
6,saki,25,hangzhou
7,mutsumi,45,shanghai
8,uika,26,shanghai
9,umiri,27,shenzhen
10,nyamu,37,shanghai

file2.csv (dipisahkan koma):

1,saki,25,hangzhou
2,mutsumi,45,shanghai
3,uika,26,shanghai
4,umiri,27,shenzhen
5,nyamu,37,shanghai

Contoh 1: Impor file CSV dari HDFS

Mengimpor file1.txt ke test_table menggunakan koma sebagai pemisah kolom. Properti fs.defaultFS diperlukan untuk terhubung ke kluster HDFS.

LOAD LABEL example_db.label1
(
    DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
    INTO TABLE `test_table`
    COLUMNS TERMINATED BY ","
)
WITH HDFS
(
    "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);

Contoh 2: Impor beberapa file ke beberapa tabel dalam satu pekerjaan

Mengimpor file2.csv ke test_table dan file1.txt ke test_table2 dalam satu pekerjaan atomik. Klausul SET menambahkan kolom age sebesar 1 menggunakan kolom temp_age dari file sumber.

LOAD LABEL test_db.test_02
(
    DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
    INTO TABLE `test_table`
    COLUMNS TERMINATED BY ","
    (id, name, temp_age, address)
    SET (
        age = temp_age + 1
    ),
    DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
    INTO TABLE `test_table2`
    COLUMNS TERMINATED BY ","
)
WITH HDFS
(
    "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);

Contoh 3: Impor dari kluster HDFS dalam mode high availability (HA)

Menggunakan wildcard untuk mengimpor semua file dalam direktori /example/. Pemisah \\x01 adalah pemisah bidang Hive default.

LOAD LABEL test_db.test_03
(
    DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
    INTO TABLE `test_table`
    COLUMNS TERMINATED BY "\\x01"
)
WITH HDFS
(
    "hadoop.username"                                          = "hive",
    "fs.defaultFS"                                            = "hdfs://my_ha",
    "dfs.nameservices"                                        = "my_ha",
    "dfs.ha.namenodes.my_ha"                                  = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1"             = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2"             = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider.my_ha"                = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

Contoh 4: Impor dengan filter baris

Hanya mengimpor baris di mana age < 20 dari file1.txt.

LOAD LABEL test_db.test_04
(
    DATA INFILE("hdfs://host:port/example/file1.txt")
    INTO TABLE `test_table2`
    COLUMNS TERMINATED BY ","
    (id, name, age, address)
    WHERE age < 20
)
WITH HDFS
(
    "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);

Contoh 5: Mode MERGE dengan timeout dan toleransi error

Mengimpor file1.txt dalam mode MERGE ke test_table (yang menggunakan model Unique Key). Baris di mana age < 20 ditandai untuk dihapus. Pekerjaan ini mengizinkan hingga 10% baris error dan memiliki timeout 1 jam.

LOAD LABEL test_db.test_05
(
    MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
    INTO TABLE `test_table`
    COLUMNS TERMINATED BY ","
    (id, name, age, address)
    DELETE ON age < 20
)
WITH HDFS
(
    "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
)
PROPERTIES
(
    "timeout"          = "3600",
    "max_filter_ratio" = "0.1"
);

Periksa status pekerjaan

Broker Load dijalankan secara asinkron. Pengiriman LOAD LABEL yang berhasil hanya mengonfirmasi bahwa pekerjaan diterima—bukan bahwa data telah selesai dimuat. Gunakan SHOW LOAD untuk melacak progres.

Sintaks

SHOW LOAD
[FROM db_name]
[
    WHERE
    [LABEL [ = "your_label" | LIKE "label_matcher"]]
    [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

Parameter

Parameter

Deskripsi

db_name

Database yang akan diinterogasi. Default ke database saat ini.

LABEL

Memfilter berdasarkan label. Mendukung pencocokan eksak dan pencocokan pola LIKE.

STATE

Memfilter berdasarkan status pekerjaan.

ORDER BY

Mengurutkan hasil.

LIMIT

Membatasi jumlah hasil yang dikembalikan. Mengembalikan semua hasil jika dihilangkan.

OFFSET

Melewati N hasil pertama. Default: 0.

Status pekerjaan

State

Makna

Tindakan yang perlu dilakukan

PENDING

Pekerjaan sedang dalam antrean, menunggu eksekusi.

Tunggu. Pantau kedalaman antrean jika pekerjaan telah lama dalam status pending.

ETL

Data sedang diekstraksi dan ditransformasi.

Tunggu.

LOADING

Data sedang ditulis ke tabel.

Tunggu.

FINISHED

Impor berhasil diselesaikan.

Tidak perlu tindakan.

CANCELLED

Pekerjaan dibatalkan atau gagal. Label dapat digunakan kembali untuk pekerjaan baru.

Periksa kolom ErrorMsg dan URL dalam output SHOW LOAD untuk detail kegagalan.

Kolom output

SHOW LOAD mengembalikan kolom berikut untuk setiap pekerjaan:

Kolom

Deskripsi

JobId

ID pekerjaan internal yang ditetapkan oleh SelectDB.

Label

Label yang Anda tentukan dalam LOAD LABEL.

State

Status pekerjaan saat ini. Lihat Status pekerjaan.

Progress

Progres impor dalam persentase.

Type

Jenis impor, misalnya BROKER.

EtlInfo

Statistik fase ETL, termasuk jumlah baris dan baris yang difilter.

TaskInfo

Rincian tingkat tugas: kluster, timeout, dan max_filter_ratio.

ErrorMsg

Pesan error jika pekerjaan gagal atau dibatalkan.

CreateTime

Waktu pekerjaan dibuat.

EtlStartTime

Waktu fase ETL dimulai.

EtlFinishTime

Waktu fase ETL selesai.

LoadStartTime

Waktu fase pemuatan dimulai.

LoadFinishTime

Waktu fase pemuatan selesai.

URL

URL ke log error. Buka URL ini untuk melihat daftar lengkap baris yang ditolak beserta alasannya saat pekerjaan gagal atau memiliki max_filter_ratio yang tidak nol.

JobDetails

Objek JSON dengan statistik per file: baris yang dipindai, baris yang difilter, dan baris yang tidak dipilih.

Jika pekerjaan berada dalam status CANCELLED, gunakan kolom ErrorMsg dan URL untuk mendiagnosis kegagalan sebelum mengirim ulang.

Contoh

Kueri pekerjaan berdasarkan pola label, kembalikan 10 yang tertua

SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;

Kueri pekerjaan tertentu, diurutkan berdasarkan waktu mulai

SHOW LOAD FROM example_db
WHERE LABEL = "load_example_db_20140102"
ORDER BY LoadStartTime DESC;

Kueri pekerjaan dalam status LOADING

SHOW LOAD FROM example_db
WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";

Paginasi hasil: lewati 5 pertama, kembalikan 10 berikutnya

SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC LIMIT 10 OFFSET 5;

Batalkan pekerjaan

Batalkan pekerjaan yang berada dalam status PENDING, ETL, atau LOADING menggunakan CANCEL LOAD. Data yang telah ditulis oleh pekerjaan yang dibatalkan akan dikembalikan secara otomatis.

Sintaks

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];

Parameter

Parameter

Deskripsi

db_name

Database yang berisi pekerjaan. Default ke database saat ini.

load_label

Label eksak dari pekerjaan yang akan dibatalkan, atau pola untuk LABEL LIKE.

Contoh

Batalkan pekerjaan tertentu

CANCEL LOAD
FROM example_db
WHERE LABEL = "example_db_test_load_label";

Batalkan semua pekerjaan yang labelnya diawali dengan `example_`

CANCEL LOAD
FROM example_db
WHERE LABEL LIKE "example_";

Praktik terbaik

Batasi ukuran pekerjaan individu di bawah 100 GB

Batas data per pekerjaan adalah jumlah node dikalikan 3 GB. Untuk dataset besar, bagi data menjadi beberapa pekerjaan daripada satu pekerjaan besar. Hal ini mengurangi biaya pengulangan jika pekerjaan gagal dan mempermudah pelacakan progres.

Rancang label untuk kemudahan pelacakan

Gunakan label yang menyertakan metadata—misalnya, etl_orders_20260328_batch01. Hal ini mempermudah identifikasi pekerjaan yang gagal, menjalankan ulang batch tertentu, dan audit riwayat impor dengan SHOW LOAD.

Monitor antrean sebelum mengirim

SelectDB membatasi pekerjaan impor konkuren per kluster antara 3 hingga 10, dengan batas antrean 100. Pekerjaan yang melebihi batas antrean akan ditolak segera. Monitor pekerjaan aktif dan dalam antrean dengan SHOW LOAD sebelum mengirim pekerjaan baru.

Waktu dalam antrean dihitung terhadap timeout pekerjaan. Jika pekerjaan menunggu terlalu lama dalam antrean sebelum dieksekusi, pekerjaan tersebut mungkin timeout dan dibatalkan. Sesuaikan properti timeout jika workload Anda mencakup penundaan antrean.

Pilih max_filter_ratio berdasarkan kualitas data

Nilai

Perilaku

Kapan digunakan

0 (default)

Pekerjaan gagal jika ada baris yang mengalami kesalahan format. Tidak ada baris yang dilewati.

Data sumber yang tepercaya dan telah divalidasi.

0.010.1

SelectDB melewati hingga proporsi baris rusak tersebut dan menyelesaikan pekerjaan.

Impor besar di mana sejumlah kecil baris rusak dapat diterima.

>0.1

Sama seperti di atas, tetapi rasio tinggi biasanya menunjukkan adanya masalah kualitas data di hulu.

Investigasi data sumber sebelum menaikkan ambang batas ini.

Jika pekerjaan gagal atau melewati baris, buka URL dari kolom URL dalam SHOW LOAD untuk melihat secara tepat baris mana yang ditolak dan alasannya.

Gunakan load_parallelism untuk impor besar

Mengatur load_parallelism ke nilai lebih besar dari 1 menjalankan beberapa rencana eksekusi secara konkuren. Ini mempercepat impor untuk file besar tetapi meningkatkan penggunaan memori. Mulailah dengan 2 atau 4 dan sesuaikan berdasarkan kapasitas kluster.

Jamin idempotensi dengan label

Broker Load menerapkan semantik at-most-once per label. Jika pekerjaan dengan label tertentu berhasil (status FINISHED), pengiriman ulang dengan label yang sama tidak berpengaruh. Jika pekerjaan gagal atau dibatalkan, label tersebut tersedia untuk digunakan kembali. Selalu periksa SHOW LOAD sebelum memutuskan apakah akan mengirim ulang.

Batasan

Batasan

Nilai

Pekerjaan impor konkuren per kluster

3–10

Panjang antrean maksimum

100 (pekerjaan yang melebihi batas ini akan ditolak)

Timeout default

4 jam (penghitung dimulai sejak pengiriman pekerjaan, termasuk waktu tunggu dalam antrean)

Pemisah kolom

Hanya pemisah satu byte; hanya untuk file CSV

Mode MERGE dan DELETE

Hanya untuk tabel model Unique Key

NEGATIVE

Hanya untuk kolom INTEGER yang diagregasi dengan SUM

Langkah selanjutnya