Broker Load digunakan untuk mengimpor data secara asinkron ke dalam instance ApsaraDB for SelectDB. Anda dapat membaca ratusan GB data secara efisien dari sistem penyimpanan terdistribusi seperti Hadoop Distributed File System (HDFS), Object Storage Service (OSS), dan Amazon Simple Storage Service (Amazon S3). Topik ini menjelaskan cara menggunakan Broker Load untuk mengimpor data ke dalam instance ApsaraDB for SelectDB.
Manfaat
Broker Load menawarkan manfaat berikut:
Volume data besar: Broker Load dapat mengimpor ratusan GB data offline sekaligus.
Konkurensi tinggi dalam mode asinkron: Broker Load memungkinkan impor data tanpa pemblokiran, meningkatkan pemanfaatan sumber daya kluster.
Kompatibilitas tinggi: Dapat membaca data dari sistem penyimpanan jarak jauh seperti HDFS dan Amazon S3.
Kemudahan penggunaan:
Anda dapat membuat pekerjaan Broker Load melalui protokol MySQL untuk mengimpor data.
Anda dapat mengeksekusi pernyataan
SHOW LOADuntuk memantau kemajuan dan hasil impor data secara real-time.
Skenario
Broker Load dapat membaca volume data besar secara efisien dari sistem penyimpanan terdistribusi seperti HDFS, OSS, dan Amazon S3.
Waktu impor data untuk volume kecil, seperti 100 MB, adalah pada level detik.
Waktu impor data untuk volume besar, seperti 100 GB, adalah pada level menit.
Buat pekerjaan Broker Load
Pekerjaan Broker Load menggunakan broker untuk membaca dan mengimpor data dari sistem penyimpanan jarak jauh seperti HDFS atau Amazon S3 ke dalam tabel instance ApsaraDB for SelectDB.
Sintaksis
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];Parameter
Parameter | Deskripsi |
| Pengenal unik untuk pekerjaan Broker Load. Anda dapat menyesuaikan label untuk pekerjaan Broker Load Anda dalam pernyataan impor. Setelah pekerjaan Broker Load dikirimkan, Anda dapat memeriksa status pekerjaan berdasarkan label. Label unik juga dapat mencegah Anda mengimpor data yang sama berulang kali. Jika pekerjaan Broker Load yang terkait dengan label berada dalam status CANCELLED, label tersebut dapat digunakan oleh pekerjaan Broker Load lainnya. Format: Catatan Kami merekomendasikan Anda menggunakan label yang sama untuk batch data yang sama. Dengan cara ini, permintaan berulang untuk mengimpor batch data yang sama hanya diterima sekali. Ini memastikan semantik at-most-once. |
| Deskripsi file yang akan diimpor. Untuk informasi lebih lanjut, lihat bagian Parameter dari data_desc1 dari topik ini. |
| Tipe broker yang akan digunakan. Nilai valid: HDFS dan S3. Pekerjaan Broker Load yang menggunakan broker S3 juga disebut sebagai pekerjaan Object Storage Service (OSS) Load. Untuk informasi lebih lanjut, lihat Impor data dengan menggunakan OSS. |
| Parameter yang diperlukan untuk broker mengakses sistem penyimpanan jarak jauh, seperti Baidu Object Storage (BOS) atau HDFS. Sintaksis: |
| Parameter impor. Untuk informasi lebih lanjut, lihat bagian Parameter dari load_properties dari topik ini. |
Parameter dari data_desc1
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]Parameter | Deskripsi |
| Mode penggabungan data. Nilai default: APPEND, yang menentukan bahwa impor adalah operasi tambah standar. Anda dapat mengatur parameter ini ke MERGE atau DELETE hanya untuk tabel yang menggunakan model Unique Key. Jika parameter ini diatur ke MERGE, pernyataan DELETE ON harus digunakan untuk menentukan kolom yang berfungsi sebagai kolom Delete Flag. Jika parameter ini diatur ke DELETE, semua data yang terlibat dalam impor akan dihapus dari tabel tempat Anda ingin melakukan impor. |
| Path ke file yang akan diimpor. Anda dapat mencantumkan beberapa path file dan menggunakan wildcard untuk mencocokkan file. Pastikan setiap path menunjuk ke file aktual, bukan hanya ke direktori. Jika tidak, impor gagal. |
| Menentukan bahwa impor adalah impor negatif. Parameter ini valid hanya untuk data tipe INTEGER yang diagregasi menggunakan fungsi SUM. Jika Anda menentukan parameter ini, operasi negasi dilakukan pada data tipe INTEGER yang diagregasi menggunakan fungsi SUM. Ini membantu mengimbangi data kesalahan yang diimpor. |
| Partisi spesifik dari tabel ke mana impor dibatasi. Data yang tidak berada dalam partisi yang ditentukan dikecualikan dari proses impor. |
| Pemisah kolom. Parameter ini valid hanya jika file yang akan diimpor adalah file CSV. Anda hanya dapat menentukan pemisah byte tunggal. |
| Format file yang akan diimpor. Nilai default: CSV. Nilai valid: CSV, PARQUET, dan ORC. |
| Urutan kolom dalam file yang akan diimpor. |
| Kolom yang akan diekstraksi dari file yang akan diimpor. |
| Kondisi pra-filter. Data pertama kali digabungkan menjadi baris data mentah secara berurutan berdasarkan parameter |
| Fungsi untuk konversi kolom. |
| Kondisi filter untuk data. |
| Pernyataan yang digunakan untuk menentukan kolom yang berfungsi sebagai kolom Delete Flag dalam data yang akan diimpor, serta untuk mendefinisikan hubungan perhitungan. Ekspresi diperlukan jika mode impor MERGE digunakan. Parameter ini valid hanya untuk tabel yang menggunakan model Unique Key. |
| Pernyataan yang digunakan untuk menentukan kolom yang berfungsi sebagai kolom urutan dalam data yang akan diimpor. Parameter ini digunakan untuk mempertahankan urutan data yang benar selama impor. Parameter ini valid hanya untuk tabel yang menggunakan model Unique Key. |
| Parameter terkait format untuk file yang akan diimpor. Misalnya, untuk mengimpor file JSON, Anda dapat menentukan parameter seperti json_root, jsonpaths, dan fuzzy_parse. |
Parameter dari load_properties
Parameter | Deskripsi |
| Periode timeout impor. Unit: detik. Nilai default: 14400, yang menentukan 4 jam. |
| Rasio maksimum data yang dapat difilter selama impor karena alasan seperti ketidaksesuaian dengan standar data. Nilai default: 0, yang menentukan kebijakan toleransi nol yang tidak mengizinkan data difilter. Nilai valid: 0 hingga 1. |
| Ukuran maksimum memori yang dapat dialokasikan untuk pekerjaan impor. Unit: byte. Nilai default: 2147483648, yang menentukan 2 GB. |
| Menentukan apakah akan mengaktifkan mode ketat untuk pekerjaan impor. Nilai default: false. |
| Zona waktu yang digunakan untuk fungsi sensitif zona waktu dalam pekerjaan impor. Nilai default: |
| Tingkat paralelisme (DOP) impor. Nilai default: 1. Jika Anda mengatur parameter ini ke nilai lebih besar dari 1, beberapa rencana eksekusi diinisiasi untuk menjalankan beberapa pekerjaan impor secara bersamaan. Ini mempercepat proses impor. |
| DOP untuk mengirim data yang akan diproses dalam batch. Jika nilai parameter ini lebih besar dari nilai max_send_batch_parallelism_per_job dalam konfigurasi backend (BE) kluster komputasi, nilai max_send_batch_parallelism_per_job digunakan untuk kluster komputasi. |
| Menentukan apakah data diimpor hanya ke satu tablet dalam partisi yang sesuai. Nilai default: false. Parameter ini valid hanya jika data diimpor ke tabel yang menggunakan model Duplicate Key dan berisi bucket acak. |
Contoh
Buat tabel di mana Anda ingin mengimpor data dalam instance ApsaraDB for SelectDB. Contoh kode:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");Buat file untuk mengimpor data.
Buat file bernama
file1.txtdengan konten berikut:1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghaiBuat file bernama
file2.csvdengan konten berikut:1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
Impor data file ke dalam tabel.
Impor file
file1.txtdari HDFS. Contoh kode:LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Impor file file1.txt ke dalam tabel test_table. Pisahkan kolom dengan koma (,). Saat mengimpor data dari HDFS, tentukan properti
fs.defaultFSdalam parameter broker_properties untuk memastikan bahwa sistem dapat terhubung ke kluster HDFS dan menemukan file yang sesuai.Impor dua file ke dalam dua tabel dari HDFS. Contoh kode:
LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Impor file
file1.txtke dalam tabeltest_table. Impor filefile2.csvke dalam tabeltest_table2, dan tambahkan nilai dalam kolom age sebesar 1 berdasarkan nilai dari kolom temp_age dalam filefile2.csv.Impor batch data dari kluster HDFS yang diterapkan dalam mode high-availability (HA). Contoh kode:
LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );Gunakan pemisah default
\\x01dari Hive, dan gunakan wildcard(*)untuk menentukan semua file dalam direktoridata.Filter data dalam file file1.txt untuk mengimpor hanya baris data yang memenuhi kondisi filter. Contoh kode:
LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );Hanya baris dengan nilai kolom
agelebih kecil dari 20 yang diimpor.Impor file file1.txt dari HDFS. Tentukan periode timeout dan rasio filtering impor. Impor semua baris dari file kecuali baris dengan nilai kolom
agelebih kecil dari 20. Contoh kode:LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );Impor data dalam mode MERGE. Tabel
test_tableharus menggunakan model Unique Key. Jika nilai kolomagelebih kecil dari 20 dalam baris yang akan diimpor, baris tersebut ditandai untuk dihapus. Periode timeout untuk pekerjaan impor adalah 3.600 detik, dan rasio filtering maksimum untuk baris data kesalahan adalah 10%.
Batalkan pekerjaan Broker Load
Jika pekerjaan Broker Load tidak dalam status CANCELLED atau FINISHED, Anda dapat membatalkannya secara manual. Tentukan label pekerjaan impor yang akan dibatalkan. Setelah pekerjaan impor dibatalkan, data yang telah ditulis dalam pekerjaan tersebut dikembalikan dan tidak berlaku.
Sintaksis
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];Parameter
Parameter | Deskripsi |
| Nama database. Secara default, jika Anda tidak menentukan parameter ini, database saat ini yang digunakan. |
| Label pekerjaan impor. Cocokkan persis didukung. Jika Anda menggunakan pernyataan LABEL LIKE, pekerjaan impor yang labelnya mengandung label_pattern akan cocok. |
Contoh
Batalkan pekerjaan impor dengan label
example_db_test_load_labeldari databaseexample_db.CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";Batalkan pekerjaan impor dengan label yang mengandung
example_dari databaseexample_db.CANCEL LOAD FROM example_db WHERE LABEL like "example_";
Kueri status pekerjaan Broker Load
Broker Load adalah metode impor asinkron. Eksekusi sukses pernyataan impor hanya menunjukkan pengiriman sukses pekerjaan Broker Load, bukan penyelesaian impor data. Untuk menanyakan status pekerjaan Broker Load, eksekusi pernyataan SHOW LOAD.
Sintaksis
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];Parameter
Parameter | Deskripsi |
| Nama database. Secara default, jika Anda tidak menentukan parameter ini, database saat ini yang digunakan. |
| Label pekerjaan impor. Cocokkan persis didukung. Jika Anda menggunakan pernyataan LABEL LIKE, pekerjaan impor yang labelnya mengandung label_matcher akan cocok. |
| Status pekerjaan impor. Anda hanya dapat melihat pekerjaan impor yang berada dalam status tertentu. |
| Urutan dalam mana catatan data yang dikembalikan diurutkan. |
| Batas jumlah catatan data yang ditampilkan. Jika Anda tidak menentukan parameter ini, semua catatan data ditampilkan. |
| Jumlah rekaman awal yang dilewati sebelum hasil kueri mulai ditampilkan. Nilai default: 0. |
Contoh
Kueri pekerjaan impor dengan label yang mengandung
2014_01_02dalam databaseexample_db, dan tampilkan 10 pekerjaan impor yang disimpan paling lama.SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;Kueri pekerjaan impor dengan label
load_example_db_20140102dalam databaseexample_db. Urutkan pekerjaan ini berdasarkanLoadStartTimedalam urutan menurun.SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;Kueri pekerjaan impor dengan label
load_example_db_20140102dalam databaseexample_db. Pekerjaan impor dalam statusloading.SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";Kueri pekerjaan impor dalam database
example_dbdan urutkan pekerjaan ini berdasarkanLoadStartTimedalam urutan menurun. Lewati lima hasil kueri awal dan tampilkan sepuluh hasil kueri berikutnya.SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
Praktik terbaik
Kueri Status Pekerjaan Impor
Broker Load adalah metode impor asinkron. Eksekusi sukses pernyataan impor hanya menunjukkan pengiriman sukses pekerjaan Broker Load, bukan penyelesaian impor data. Untuk menanyakan status pekerjaan impor, eksekusi pernyataan
SHOW LOAD.Batalkan Pekerjaan Impor
Untuk membatalkan pekerjaan impor yang telah dikirimkan tetapi belum selesai, eksekusi pernyataan
CANCEL LOAD. Setelah pekerjaan impor dibatalkan, data yang telah ditulis dalam pekerjaan tersebut dikembalikan dan tidak berlaku.Label, Transaksi Impor, dan Atomicitas Multi-Tabel
Semua pekerjaan impor dalam ApsaraDB for SelectDB bersifat atomik. Sifat atomik juga dipastikan ketika data diimpor ke beberapa tabel dalam satu pekerjaan impor. Selain itu, ApsaraDB for SelectDB menggunakan label untuk memastikan bahwa data yang diimpor tidak hilang atau diduplikasi.
Pemetaan, Derivasi, dan Penyaringan Kolom
ApsaraDB for SelectDB mendukung berbagai operasi pada konversi kolom dan penyaringan kolom dalam pernyataan impor. Sebagian besar fungsi bawaan dan fungsi yang ditentukan pengguna (UDF) didukung. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber.
Penyaringan Baris Data Kesalahan
ApsaraDB for SelectDB memungkinkan Anda melewati beberapa baris data dalam format salah dalam pekerjaan impor. Rasio penyaringan ditentukan oleh parameter
max_filter_ratio. Nilai default adalah 0, yang menentukan kebijakan toleransi nol yang tidak mengizinkan data difilter. Pekerjaan impor gagal jika satu baris data kesalahan ditemukan. Jika Anda ingin mengabaikan beberapa baris data kesalahan selama impor, Anda dapat mengatur parameter ini ke nilai antara 0 dan 1. Dengan cara ini, ApsaraDB for SelectDB secara otomatis melewati baris dalam format data salah.Untuk informasi lebih lanjut tentang cara menghitung rasio toleransi, lihat Mengonversi Data Sumber.
Mode Ketat
Properti
strict_modedigunakan untuk menentukan apakah pekerjaan impor berjalan dalam mode ketat. Jika pekerjaan impor berjalan dalam mode ketat, pemetaan, konversi, dan penyaringan kolom dipengaruhi.Periode Timeout
Periode timeout default untuk pekerjaan Broker Load adalah 4 jam. Penghitungan dimulai dari waktu ketika pekerjaan impor dikirimkan. Jika pekerjaan impor tidak selesai dalam periode timeout, pekerjaan impor gagal.
Batas Jumlah Data dan Jumlah Pekerjaan
Kami merekomendasikan Anda mengimpor data kurang dari 100 GB dalam satu pekerjaan Broker Load. Secara teori, jumlah data yang dapat diimpor dalam satu pekerjaan impor tidak dibatasi. Namun, jika jumlah data yang akan diimpor terlalu besar, pekerjaan impor mungkin berjalan untuk jangka waktu yang lama, dan biaya ulang adalah tinggi jika pekerjaan impor gagal.
Selain itu, jumlah node dalam kluster dibatasi. Oleh karena itu, batas ditetapkan pada jumlah maksimum data yang akan diimpor, yaitu jumlah node dikalikan dengan 3 GB. Ini memastikan bahwa sumber daya sistem digunakan dengan benar. Jika Anda perlu mengimpor jumlah data yang besar, kami merekomendasikan Anda mengirimkan beberapa pekerjaan impor.
ApsaraDB for SelectDB membatasi jumlah pekerjaan impor konkuren dalam kluster, yang berkisar antara 3 hingga 10. Jika jumlah pekerjaan impor yang Anda kirimkan lebih besar dari batas, pekerjaan impor tambahan menunggu dalam antrian. Panjang maksimum antrian adalah 100. Jika lebih dari 100 pekerjaan impor menunggu, pekerjaan impor tambahan langsung ditolak.
CatatanWaktu tunggu juga dihitung dalam total periode waktu untuk pekerjaan impor. Jika terjadi kesalahan timeout, pekerjaan dibatalkan. Kami merekomendasikan Anda memantau status pekerjaan impor untuk mengontrol frekuensi pengiriman pekerjaan dengan tepat.