Broker Load adalah metode impor massal asinkron yang memuat data dari sistem penyimpanan terdistribusi—Hadoop Distributed File System (HDFS), Object Storage Service (OSS), dan Amazon Simple Storage Service (Amazon S3)—langsung ke ApsaraDB for SelectDB. Satu pekerjaan impor dapat menangani ratusan GB tanpa memblokir kluster Anda.
Cara kerja
Kirim pernyataan
LOAD LABELmelalui protokol MySQL. Pekerjaan langsung diterima dan dijalankan secara asinkron.Broker terhubung ke sistem penyimpanan remote dan membaca file yang ditentukan.
Data dimuat ke tabel target. Jika Anda memuat ke beberapa tabel, semua penulisan dalam pekerjaan tersebut bersifat atomik.
Label mencatat status penyelesaian pekerjaan, mencegah impor duplikat jika Anda mengirim ulang permintaan yang sama.
Kinerja
Volume data | Waktu penyelesaian yang diharapkan |
~100 MB | ~10 detik |
~100 GB | ~10 menit |
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraDB for SelectDB.
Akses jaringan ke sistem penyimpanan sumber (HDFS, OSS, atau Amazon S3). Jika sistem penyimpanan remote berada di lingkungan jaringan publik, instans ApsaraDB for SelectDB Anda harus memiliki akses jaringan publik. Lihat Atasi masalah jaringan dengan sumber data.
Tabel target yang telah dibuat di instans SelectDB Anda.
Kirim pekerjaan Broker Load
Sintaks
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];Parameter
Parameter | Deskripsi |
| Identifikasi unik untuk pekerjaan, dalam format |
| Menjelaskan file yang akan diimpor. Lihat Parameter deskripsi data. |
| Jenis broker. Nilai yang valid: |
| Parameter koneksi untuk sistem penyimpanan remote, dalam format |
| Parameter perilaku impor. Lihat Properti impor. |
Parameter deskripsi data
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]Parameter | Deskripsi |
| Mode merge. Default: |
| Jalur ke file yang akan diimpor. Mendukung beberapa jalur dan wildcard. Setiap jalur harus mengarah ke file aktual, bukan direktori. |
| Membalik nilai integer yang diagregasi oleh fungsi SUM. Gunakan ini untuk mengimbangi data yang sebelumnya diimpor namun mengandung kesalahan. Hanya berlaku untuk kolom INTEGER yang diagregasi dengan SUM. |
| Membatasi impor hanya ke partisi tertentu. Data di luar partisi yang ditentukan akan dikecualikan. |
| Pemisah kolom. Hanya berlaku untuk file CSV. Hanya pemisah satu byte. |
| Format file. Default: |
| Urutan kolom dalam file sumber. |
| Ekstraksi kolom dari jalur file. |
| Filter awal yang diterapkan setelah kolom dirakit dari |
| Ekspresi transformasi kolom yang diterapkan selama impor. |
| Memfilter baris setelah pemetaan kolom. Hanya baris yang sesuai yang diimpor. |
| Diperlukan saat menggunakan mode |
| Menentukan kolom urutan untuk menjaga urutan data. Hanya berlaku untuk tabel model Unique Key. |
| Parameter khusus format, seperti |
Properti impor
Parameter | Default | Deskripsi |
|
| Timeout dalam detik. Penghitung dimulai saat pekerjaan dikirim. Jika pekerjaan tidak selesai dalam periode ini, pekerjaan gagal. |
|
| Proporsi maksimum baris error yang dapat ditoleransi, dari |
|
| Memori maksimum yang dialokasikan untuk pekerjaan, dalam byte. |
|
| Jika |
|
| Zona waktu untuk fungsi yang sensitif terhadap zona waktu: |
|
| Tingkat paralelisme (DOP) untuk impor. Mengatur nilai ini lebih besar dari |
| — | DOP untuk mengirim batch data. Dibatasi oleh |
|
| Jika |
Contoh
Semua contoh berikut menggunakan struktur tabel yang sama. Jalankan pernyataan berikut terlebih dahulu untuk membuat tabel dan file data sampel.
Buat tabel
CREATE TABLE test_table
(
id int,
name varchar(50),
age int,
address varchar(50)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");
CREATE TABLE test_table2
(
id int,
name varchar(50),
age int,
address varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");File data sampel
file1.txt (dipisahkan koma):
1,tomori,32,shanghai
2,anon,22,beijing
3,taki,23,shenzhen
4,rana,45,hangzhou
5,soyo,14,shanghai
6,saki,25,hangzhou
7,mutsumi,45,shanghai
8,uika,26,shanghai
9,umiri,27,shenzhen
10,nyamu,37,shanghaifile2.csv (dipisahkan koma):
1,saki,25,hangzhou
2,mutsumi,45,shanghai
3,uika,26,shanghai
4,umiri,27,shenzhen
5,nyamu,37,shanghaiContoh 1: Impor file CSV dari HDFS
Mengimpor file1.txt ke test_table menggunakan koma sebagai pemisah kolom. Properti fs.defaultFS diperlukan untuk terhubung ke kluster HDFS.
LOAD LABEL example_db.label1
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
INTO TABLE `test_table`
COLUMNS TERMINATED BY ","
)
WITH HDFS
(
"fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);Contoh 2: Impor beberapa file ke beberapa tabel dalam satu pekerjaan
Mengimpor file2.csv ke test_table dan file1.txt ke test_table2 dalam satu pekerjaan atomik. Klausul SET menambahkan kolom age sebesar 1 menggunakan kolom temp_age dari file sumber.
LOAD LABEL test_db.test_02
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
INTO TABLE `test_table`
COLUMNS TERMINATED BY ","
(id, name, temp_age, address)
SET (
age = temp_age + 1
),
DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
INTO TABLE `test_table2`
COLUMNS TERMINATED BY ","
)
WITH HDFS
(
"fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);Contoh 3: Impor dari kluster HDFS dalam mode high availability (HA)
Menggunakan wildcard untuk mengimpor semua file dalam direktori /example/. Pemisah \\x01 adalah pemisah bidang Hive default.
LOAD LABEL test_db.test_03
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
INTO TABLE `test_table`
COLUMNS TERMINATED BY "\\x01"
)
WITH HDFS
(
"hadoop.username" = "hive",
"fs.defaultFS" = "hdfs://my_ha",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);Contoh 4: Impor dengan filter baris
Hanya mengimpor baris di mana age < 20 dari file1.txt.
LOAD LABEL test_db.test_04
(
DATA INFILE("hdfs://host:port/example/file1.txt")
INTO TABLE `test_table2`
COLUMNS TERMINATED BY ","
(id, name, age, address)
WHERE age < 20
)
WITH HDFS
(
"fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
);Contoh 5: Mode MERGE dengan timeout dan toleransi error
Mengimpor file1.txt dalam mode MERGE ke test_table (yang menggunakan model Unique Key). Baris di mana age < 20 ditandai untuk dihapus. Pekerjaan ini mengizinkan hingga 10% baris error dan memiliki timeout 1 jam.
LOAD LABEL test_db.test_05
(
MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
INTO TABLE `test_table`
COLUMNS TERMINATED BY ","
(id, name, age, address)
DELETE ON age < 20
)
WITH HDFS
(
"fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);Periksa status pekerjaan
Broker Load dijalankan secara asinkron. Pengiriman LOAD LABEL yang berhasil hanya mengonfirmasi bahwa pekerjaan diterima—bukan bahwa data telah selesai dimuat. Gunakan SHOW LOAD untuk melacak progres.
Sintaks
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];Parameter
Parameter | Deskripsi |
| Database yang akan diinterogasi. Default ke database saat ini. |
| Memfilter berdasarkan label. Mendukung pencocokan eksak dan pencocokan pola |
| Memfilter berdasarkan status pekerjaan. |
| Mengurutkan hasil. |
| Membatasi jumlah hasil yang dikembalikan. Mengembalikan semua hasil jika dihilangkan. |
| Melewati N hasil pertama. Default: |
Status pekerjaan
State | Makna | Tindakan yang perlu dilakukan |
| Pekerjaan sedang dalam antrean, menunggu eksekusi. | Tunggu. Pantau kedalaman antrean jika pekerjaan telah lama dalam status pending. |
| Data sedang diekstraksi dan ditransformasi. | Tunggu. |
| Data sedang ditulis ke tabel. | Tunggu. |
| Impor berhasil diselesaikan. | Tidak perlu tindakan. |
| Pekerjaan dibatalkan atau gagal. Label dapat digunakan kembali untuk pekerjaan baru. | Periksa kolom |
Kolom output
SHOW LOAD mengembalikan kolom berikut untuk setiap pekerjaan:
Kolom | Deskripsi |
| ID pekerjaan internal yang ditetapkan oleh SelectDB. |
| Label yang Anda tentukan dalam |
| Status pekerjaan saat ini. Lihat Status pekerjaan. |
| Progres impor dalam persentase. |
| Jenis impor, misalnya |
| Statistik fase ETL, termasuk jumlah baris dan baris yang difilter. |
| Rincian tingkat tugas: kluster, timeout, dan |
| Pesan error jika pekerjaan gagal atau dibatalkan. |
| Waktu pekerjaan dibuat. |
| Waktu fase ETL dimulai. |
| Waktu fase ETL selesai. |
| Waktu fase pemuatan dimulai. |
| Waktu fase pemuatan selesai. |
| URL ke log error. Buka URL ini untuk melihat daftar lengkap baris yang ditolak beserta alasannya saat pekerjaan gagal atau memiliki |
| Objek JSON dengan statistik per file: baris yang dipindai, baris yang difilter, dan baris yang tidak dipilih. |
Jika pekerjaan berada dalam status CANCELLED, gunakan kolomErrorMsgdanURLuntuk mendiagnosis kegagalan sebelum mengirim ulang.
Contoh
Kueri pekerjaan berdasarkan pola label, kembalikan 10 yang tertua
SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;Kueri pekerjaan tertentu, diurutkan berdasarkan waktu mulai
SHOW LOAD FROM example_db
WHERE LABEL = "load_example_db_20140102"
ORDER BY LoadStartTime DESC;Kueri pekerjaan dalam status LOADING
SHOW LOAD FROM example_db
WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";Paginasi hasil: lewati 5 pertama, kembalikan 10 berikutnya
SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC LIMIT 10 OFFSET 5;Batalkan pekerjaan
Batalkan pekerjaan yang berada dalam status PENDING, ETL, atau LOADING menggunakan CANCEL LOAD. Data yang telah ditulis oleh pekerjaan yang dibatalkan akan dikembalikan secara otomatis.
Sintaks
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];Parameter
Parameter | Deskripsi |
| Database yang berisi pekerjaan. Default ke database saat ini. |
| Label eksak dari pekerjaan yang akan dibatalkan, atau pola untuk |
Contoh
Batalkan pekerjaan tertentu
CANCEL LOAD
FROM example_db
WHERE LABEL = "example_db_test_load_label";Batalkan semua pekerjaan yang labelnya diawali dengan `example_`
CANCEL LOAD
FROM example_db
WHERE LABEL LIKE "example_";Praktik terbaik
Batasi ukuran pekerjaan individu di bawah 100 GB
Batas data per pekerjaan adalah jumlah node dikalikan 3 GB. Untuk dataset besar, bagi data menjadi beberapa pekerjaan daripada satu pekerjaan besar. Hal ini mengurangi biaya pengulangan jika pekerjaan gagal dan mempermudah pelacakan progres.
Rancang label untuk kemudahan pelacakan
Gunakan label yang menyertakan metadata—misalnya, etl_orders_20260328_batch01. Hal ini mempermudah identifikasi pekerjaan yang gagal, menjalankan ulang batch tertentu, dan audit riwayat impor dengan SHOW LOAD.
Monitor antrean sebelum mengirim
SelectDB membatasi pekerjaan impor konkuren per kluster antara 3 hingga 10, dengan batas antrean 100. Pekerjaan yang melebihi batas antrean akan ditolak segera. Monitor pekerjaan aktif dan dalam antrean dengan SHOW LOAD sebelum mengirim pekerjaan baru.
Waktu dalam antrean dihitung terhadap timeout pekerjaan. Jika pekerjaan menunggu terlalu lama dalam antrean sebelum dieksekusi, pekerjaan tersebut mungkin timeout dan dibatalkan. Sesuaikan properti timeout jika workload Anda mencakup penundaan antrean.Pilih max_filter_ratio berdasarkan kualitas data
Nilai | Perilaku | Kapan digunakan |
| Pekerjaan gagal jika ada baris yang mengalami kesalahan format. Tidak ada baris yang dilewati. | Data sumber yang tepercaya dan telah divalidasi. |
| SelectDB melewati hingga proporsi baris rusak tersebut dan menyelesaikan pekerjaan. | Impor besar di mana sejumlah kecil baris rusak dapat diterima. |
| Sama seperti di atas, tetapi rasio tinggi biasanya menunjukkan adanya masalah kualitas data di hulu. | Investigasi data sumber sebelum menaikkan ambang batas ini. |
Jika pekerjaan gagal atau melewati baris, buka URL dari kolom URL dalam SHOW LOAD untuk melihat secara tepat baris mana yang ditolak dan alasannya.
Gunakan load_parallelism untuk impor besar
Mengatur load_parallelism ke nilai lebih besar dari 1 menjalankan beberapa rencana eksekusi secara konkuren. Ini mempercepat impor untuk file besar tetapi meningkatkan penggunaan memori. Mulailah dengan 2 atau 4 dan sesuaikan berdasarkan kapasitas kluster.
Jamin idempotensi dengan label
Broker Load menerapkan semantik at-most-once per label. Jika pekerjaan dengan label tertentu berhasil (status FINISHED), pengiriman ulang dengan label yang sama tidak berpengaruh. Jika pekerjaan gagal atau dibatalkan, label tersebut tersedia untuk digunakan kembali. Selalu periksa SHOW LOAD sebelum memutuskan apakah akan mengirim ulang.
Batasan
Batasan | Nilai |
Pekerjaan impor konkuren per kluster | 3–10 |
Panjang antrean maksimum | 100 (pekerjaan yang melebihi batas ini akan ditolak) |
Timeout default | 4 jam (penghitung dimulai sejak pengiriman pekerjaan, termasuk waktu tunggu dalam antrean) |
Pemisah kolom | Hanya pemisah satu byte; hanya untuk file CSV |
Mode MERGE dan DELETE | Hanya untuk tabel model Unique Key |
NEGATIVE | Hanya untuk kolom INTEGER yang diagregasi dengan SUM |
Langkah selanjutnya
Impor data menggunakan OSS — Broker Load dengan broker S3 untuk Object Storage Service
Mengonversi data sumber — Detail pemetaan kolom, derivasi, dan filter