全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan Broker Load untuk mengimpor data

更新时间:Jul 30, 2025

Broker Load digunakan untuk mengimpor data secara asinkron ke dalam instance ApsaraDB for SelectDB. Anda dapat membaca ratusan GB data secara efisien dari sistem penyimpanan terdistribusi seperti Hadoop Distributed File System (HDFS), Object Storage Service (OSS), dan Amazon Simple Storage Service (Amazon S3). Topik ini menjelaskan cara menggunakan Broker Load untuk mengimpor data ke dalam instance ApsaraDB for SelectDB.

Manfaat

Broker Load menawarkan manfaat berikut:

  • Volume data besar: Broker Load dapat mengimpor ratusan GB data offline sekaligus.

  • Konkurensi tinggi dalam mode asinkron: Broker Load memungkinkan impor data tanpa pemblokiran, meningkatkan pemanfaatan sumber daya kluster.

  • Kompatibilitas tinggi: Dapat membaca data dari sistem penyimpanan jarak jauh seperti HDFS dan Amazon S3.

  • Kemudahan penggunaan:

    • Anda dapat membuat pekerjaan Broker Load melalui protokol MySQL untuk mengimpor data.

    • Anda dapat mengeksekusi pernyataan SHOW LOAD untuk memantau kemajuan dan hasil impor data secara real-time.

Skenario

Broker Load dapat membaca volume data besar secara efisien dari sistem penyimpanan terdistribusi seperti HDFS, OSS, dan Amazon S3.

  • Waktu impor data untuk volume kecil, seperti 100 MB, adalah pada level detik.

  • Waktu impor data untuk volume besar, seperti 100 GB, adalah pada level menit.

Buat pekerjaan Broker Load

Pekerjaan Broker Load menggunakan broker untuk membaca dan mengimpor data dari sistem penyimpanan jarak jauh seperti HDFS atau Amazon S3 ke dalam tabel instance ApsaraDB for SelectDB.

Sintaksis

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

Parameter

Parameter

Deskripsi

load_label

Pengenal unik untuk pekerjaan Broker Load. Anda dapat menyesuaikan label untuk pekerjaan Broker Load Anda dalam pernyataan impor. Setelah pekerjaan Broker Load dikirimkan, Anda dapat memeriksa status pekerjaan berdasarkan label. Label unik juga dapat mencegah Anda mengimpor data yang sama berulang kali. Jika pekerjaan Broker Load yang terkait dengan label berada dalam status CANCELLED, label tersebut dapat digunakan oleh pekerjaan Broker Load lainnya.

Format: [database.]label_name.

Catatan

Kami merekomendasikan Anda menggunakan label yang sama untuk batch data yang sama. Dengan cara ini, permintaan berulang untuk mengimpor batch data yang sama hanya diterima sekali. Ini memastikan semantik at-most-once.

data_desc1

Deskripsi file yang akan diimpor. Untuk informasi lebih lanjut, lihat bagian Parameter dari data_desc1 dari topik ini.

WITH broker_type

Tipe broker yang akan digunakan. Nilai valid: HDFS dan S3. Pekerjaan Broker Load yang menggunakan broker S3 juga disebut sebagai pekerjaan Object Storage Service (OSS) Load. Untuk informasi lebih lanjut, lihat Impor data dengan menggunakan OSS.

broker_properties

Parameter yang diperlukan untuk broker mengakses sistem penyimpanan jarak jauh, seperti Baidu Object Storage (BOS) atau HDFS.

Sintaksis:

( "key1" = "val1", "key2" = "val2", ...)

load_properties

Parameter impor. Untuk informasi lebih lanjut, lihat bagian Parameter dari load_properties dari topik ini.

Parameter dari data_desc1

[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

Parameter

Deskripsi

[MERGE|APPEND|DELETE]

Mode penggabungan data. Nilai default: APPEND, yang menentukan bahwa impor adalah operasi tambah standar. Anda dapat mengatur parameter ini ke MERGE atau DELETE hanya untuk tabel yang menggunakan model Unique Key. Jika parameter ini diatur ke MERGE, pernyataan DELETE ON harus digunakan untuk menentukan kolom yang berfungsi sebagai kolom Delete Flag. Jika parameter ini diatur ke DELETE, semua data yang terlibat dalam impor akan dihapus dari tabel tempat Anda ingin melakukan impor.

DATA INFILE

Path ke file yang akan diimpor. Anda dapat mencantumkan beberapa path file dan menggunakan wildcard untuk mencocokkan file. Pastikan setiap path menunjuk ke file aktual, bukan hanya ke direktori. Jika tidak, impor gagal.

NEGATIVE

Menentukan bahwa impor adalah impor negatif. Parameter ini valid hanya untuk data tipe INTEGER yang diagregasi menggunakan fungsi SUM. Jika Anda menentukan parameter ini, operasi negasi dilakukan pada data tipe INTEGER yang diagregasi menggunakan fungsi SUM. Ini membantu mengimbangi data kesalahan yang diimpor.

PARTITION(p1, p2, ...)

Partisi spesifik dari tabel ke mana impor dibatasi. Data yang tidak berada dalam partisi yang ditentukan dikecualikan dari proses impor.

COLUMNS TERMINATED BY

Pemisah kolom. Parameter ini valid hanya jika file yang akan diimpor adalah file CSV. Anda hanya dapat menentukan pemisah byte tunggal.

FORMAT AS

Format file yang akan diimpor. Nilai default: CSV. Nilai valid: CSV, PARQUET, dan ORC.

column list

Urutan kolom dalam file yang akan diimpor.

COLUMNS FROM PATH AS

Kolom yang akan diekstraksi dari file yang akan diimpor.

PRECEDING FILTER predicate

Kondisi pra-filter. Data pertama kali digabungkan menjadi baris data mentah secara berurutan berdasarkan parameter column list dan COLUMNS FROM PATH AS. Kemudian, data difilter berdasarkan kondisi pra-filter.

SET (column_mapping)

Fungsi untuk konversi kolom.

WHERE predicate

Kondisi filter untuk data.

DELETE ON expr

Pernyataan yang digunakan untuk menentukan kolom yang berfungsi sebagai kolom Delete Flag dalam data yang akan diimpor, serta untuk mendefinisikan hubungan perhitungan. Ekspresi diperlukan jika mode impor MERGE digunakan. Parameter ini valid hanya untuk tabel yang menggunakan model Unique Key.

ORDER BY

Pernyataan yang digunakan untuk menentukan kolom yang berfungsi sebagai kolom urutan dalam data yang akan diimpor. Parameter ini digunakan untuk mempertahankan urutan data yang benar selama impor. Parameter ini valid hanya untuk tabel yang menggunakan model Unique Key.

PROPERTIES ("key1"="value1", ...)

Parameter terkait format untuk file yang akan diimpor. Misalnya, untuk mengimpor file JSON, Anda dapat menentukan parameter seperti json_root, jsonpaths, dan fuzzy_parse.

Parameter dari load_properties

Parameter

Deskripsi

timeout

Periode timeout impor. Unit: detik. Nilai default: 14400, yang menentukan 4 jam.

max_filter_ratio

Rasio maksimum data yang dapat difilter selama impor karena alasan seperti ketidaksesuaian dengan standar data. Nilai default: 0, yang menentukan kebijakan toleransi nol yang tidak mengizinkan data difilter. Nilai valid: 0 hingga 1.

exec_mem_limit

Ukuran maksimum memori yang dapat dialokasikan untuk pekerjaan impor. Unit: byte. Nilai default: 2147483648, yang menentukan 2 GB.

strict_mode

Menentukan apakah akan mengaktifkan mode ketat untuk pekerjaan impor. Nilai default: false.

timezone

Zona waktu yang digunakan untuk fungsi sensitif zona waktu dalam pekerjaan impor. Nilai default: Asia/Shanghai. Fungsi yang terpengaruh termasuk strftime, alignment_timestamp, dan from_unixtime.

load_parallelism

Tingkat paralelisme (DOP) impor. Nilai default: 1. Jika Anda mengatur parameter ini ke nilai lebih besar dari 1, beberapa rencana eksekusi diinisiasi untuk menjalankan beberapa pekerjaan impor secara bersamaan. Ini mempercepat proses impor.

send_batch_parallelism

DOP untuk mengirim data yang akan diproses dalam batch. Jika nilai parameter ini lebih besar dari nilai max_send_batch_parallelism_per_job dalam konfigurasi backend (BE) kluster komputasi, nilai max_send_batch_parallelism_per_job digunakan untuk kluster komputasi.

load_to_single_tablet

Menentukan apakah data diimpor hanya ke satu tablet dalam partisi yang sesuai. Nilai default: false. Parameter ini valid hanya jika data diimpor ke tabel yang menggunakan model Duplicate Key dan berisi bucket acak.

Contoh

  1. Buat tabel di mana Anda ingin mengimpor data dalam instance ApsaraDB for SelectDB. Contoh kode:

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
    
    CREATE TABLE test_table2
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. Buat file untuk mengimpor data.

    • Buat file bernama file1.txt dengan konten berikut:

      1,tomori,32,shanghai
      2,anon,22,beijing
      3,taki,23,shenzhen
      4,rana,45,hangzhou
      5,soyo,14,shanghai
      6,saki,25,hangzhou
      7,mutsumi,45,shanghai
      8,uika,26,shanghai
      9,umiri,27,shenzhen
      10,nyamu,37,shanghai
    • Buat file bernama file2.csv dengan konten berikut:

      1,saki,25,hangzhou
      2,mutsumi,45,shanghai
      3,uika,26,shanghai
      4,umiri,27,shenzhen
      5,nyamu,37,shanghai
  3. Impor data file ke dalam tabel.

    • Impor file file1.txt dari HDFS. Contoh kode:

      LOAD LABEL example_db.label1
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `my_table`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      Impor file file1.txt ke dalam tabel test_table. Pisahkan kolom dengan koma (,). Saat mengimpor data dari HDFS, tentukan properti fs.defaultFS dalam parameter broker_properties untuk memastikan bahwa sistem dapat terhubung ke kluster HDFS dan menemukan file yang sesuai.

    • Impor dua file ke dalam dua tabel dari HDFS. Contoh kode:

      LOAD LABEL test_db.test_02
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,temp_age,address)    
          SET (
              age = temp_age + 1
          ),
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      Impor file file1.txt ke dalam tabel test_table. Impor file file2.csv ke dalam tabel test_table2, dan tambahkan nilai dalam kolom age sebesar 1 berdasarkan nilai dari kolom temp_age dalam file file2.csv.

    • Impor batch data dari kluster HDFS yang diterapkan dalam mode high-availability (HA). Contoh kode:

      LOAD LABEL test_db.test_03
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY "\\x01"
      )
      WITH HDFS
      (
          "hadoop.username" = "hive",
          "fs.defaultFS" = "hdfs://my_ha",
          "dfs.nameservices" = "my_ha",
          "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
          "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
          "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
          "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      );

      Gunakan pemisah default \\x01 dari Hive, dan gunakan wildcard (*) untuk menentukan semua file dalam direktori data.

    • Filter data dalam file file1.txt untuk mengimpor hanya baris data yang memenuhi kondisi filter. Contoh kode:

      LOAD LABEL test_db.test_04
      (
          DATA INFILE("hdfs://host:port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          WHERE age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );
      

      Hanya baris dengan nilai kolom age lebih kecil dari 20 yang diimpor.

    • Impor file file1.txt dari HDFS. Tentukan periode timeout dan rasio filtering impor. Impor semua baris dari file kecuali baris dengan nilai kolom age lebih kecil dari 20. Contoh kode:

      LOAD LABEL test_db.test_05
      (
          MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          DELETE ON age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      )
      PROPERTIES
      (
          "timeout" = "3600",
          "max_filter_ratio" = "0.1"
      );

      Impor data dalam mode MERGE. Tabel test_table harus menggunakan model Unique Key. Jika nilai kolom age lebih kecil dari 20 dalam baris yang akan diimpor, baris tersebut ditandai untuk dihapus. Periode timeout untuk pekerjaan impor adalah 3.600 detik, dan rasio filtering maksimum untuk baris data kesalahan adalah 10%.

Batalkan pekerjaan Broker Load

Jika pekerjaan Broker Load tidak dalam status CANCELLED atau FINISHED, Anda dapat membatalkannya secara manual. Tentukan label pekerjaan impor yang akan dibatalkan. Setelah pekerjaan impor dibatalkan, data yang telah ditulis dalam pekerjaan tersebut dikembalikan dan tidak berlaku.

Sintaksis

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

Parameter

Parameter

Deskripsi

db_name

Nama database. Secara default, jika Anda tidak menentukan parameter ini, database saat ini yang digunakan.

load_label

Label pekerjaan impor. Cocokkan persis didukung. Jika Anda menggunakan pernyataan LABEL LIKE, pekerjaan impor yang labelnya mengandung label_pattern akan cocok.

Contoh

  • Batalkan pekerjaan impor dengan label example_db_test_load_label dari database example_db.

    CANCEL LOAD
    FROM example_db
    WHERE LABEL = "example_db_test_load_label";
  • Batalkan pekerjaan impor dengan label yang mengandung example_ dari database example_db.

    CANCEL LOAD
    FROM example_db
    WHERE LABEL like "example_";

Kueri status pekerjaan Broker Load

Broker Load adalah metode impor asinkron. Eksekusi sukses pernyataan impor hanya menunjukkan pengiriman sukses pekerjaan Broker Load, bukan penyelesaian impor data. Untuk menanyakan status pekerjaan Broker Load, eksekusi pernyataan SHOW LOAD.

Sintaksis

SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

Parameter

Parameter

Deskripsi

db_name

Nama database. Secara default, jika Anda tidak menentukan parameter ini, database saat ini yang digunakan.

your_label

Label pekerjaan impor. Cocokkan persis didukung. Jika Anda menggunakan pernyataan LABEL LIKE, pekerjaan impor yang labelnya mengandung label_matcher akan cocok.

STATE

Status pekerjaan impor. Anda hanya dapat melihat pekerjaan impor yang berada dalam status tertentu.

ORDER BY

Urutan dalam mana catatan data yang dikembalikan diurutkan.

LIMIT

Batas jumlah catatan data yang ditampilkan. Jika Anda tidak menentukan parameter ini, semua catatan data ditampilkan.

OFFSET

Jumlah rekaman awal yang dilewati sebelum hasil kueri mulai ditampilkan. Nilai default: 0.

Contoh

  • Kueri pekerjaan impor dengan label yang mengandung 2014_01_02 dalam database example_db, dan tampilkan 10 pekerjaan impor yang disimpan paling lama.

    SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
  • Kueri pekerjaan impor dengan label load_example_db_20140102 dalam database example_db. Urutkan pekerjaan ini berdasarkan LoadStartTime dalam urutan menurun.

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
  • Kueri pekerjaan impor dengan label load_example_db_20140102 dalam database example_db. Pekerjaan impor dalam status loading.

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
  • Kueri pekerjaan impor dalam database example_db dan urutkan pekerjaan ini berdasarkan LoadStartTime dalam urutan menurun. Lewati lima hasil kueri awal dan tampilkan sepuluh hasil kueri berikutnya.

    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;

Praktik terbaik

  • Kueri Status Pekerjaan Impor

    Broker Load adalah metode impor asinkron. Eksekusi sukses pernyataan impor hanya menunjukkan pengiriman sukses pekerjaan Broker Load, bukan penyelesaian impor data. Untuk menanyakan status pekerjaan impor, eksekusi pernyataan SHOW LOAD.

  • Batalkan Pekerjaan Impor

    Untuk membatalkan pekerjaan impor yang telah dikirimkan tetapi belum selesai, eksekusi pernyataan CANCEL LOAD. Setelah pekerjaan impor dibatalkan, data yang telah ditulis dalam pekerjaan tersebut dikembalikan dan tidak berlaku.

  • Label, Transaksi Impor, dan Atomicitas Multi-Tabel

    Semua pekerjaan impor dalam ApsaraDB for SelectDB bersifat atomik. Sifat atomik juga dipastikan ketika data diimpor ke beberapa tabel dalam satu pekerjaan impor. Selain itu, ApsaraDB for SelectDB menggunakan label untuk memastikan bahwa data yang diimpor tidak hilang atau diduplikasi.

  • Pemetaan, Derivasi, dan Penyaringan Kolom

    ApsaraDB for SelectDB mendukung berbagai operasi pada konversi kolom dan penyaringan kolom dalam pernyataan impor. Sebagian besar fungsi bawaan dan fungsi yang ditentukan pengguna (UDF) didukung. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber.

  • Penyaringan Baris Data Kesalahan

    ApsaraDB for SelectDB memungkinkan Anda melewati beberapa baris data dalam format salah dalam pekerjaan impor. Rasio penyaringan ditentukan oleh parameter max_filter_ratio. Nilai default adalah 0, yang menentukan kebijakan toleransi nol yang tidak mengizinkan data difilter. Pekerjaan impor gagal jika satu baris data kesalahan ditemukan. Jika Anda ingin mengabaikan beberapa baris data kesalahan selama impor, Anda dapat mengatur parameter ini ke nilai antara 0 dan 1. Dengan cara ini, ApsaraDB for SelectDB secara otomatis melewati baris dalam format data salah.

    Untuk informasi lebih lanjut tentang cara menghitung rasio toleransi, lihat Mengonversi Data Sumber.

  • Mode Ketat

    Properti strict_mode digunakan untuk menentukan apakah pekerjaan impor berjalan dalam mode ketat. Jika pekerjaan impor berjalan dalam mode ketat, pemetaan, konversi, dan penyaringan kolom dipengaruhi.

  • Periode Timeout

    Periode timeout default untuk pekerjaan Broker Load adalah 4 jam. Penghitungan dimulai dari waktu ketika pekerjaan impor dikirimkan. Jika pekerjaan impor tidak selesai dalam periode timeout, pekerjaan impor gagal.

  • Batas Jumlah Data dan Jumlah Pekerjaan

    Kami merekomendasikan Anda mengimpor data kurang dari 100 GB dalam satu pekerjaan Broker Load. Secara teori, jumlah data yang dapat diimpor dalam satu pekerjaan impor tidak dibatasi. Namun, jika jumlah data yang akan diimpor terlalu besar, pekerjaan impor mungkin berjalan untuk jangka waktu yang lama, dan biaya ulang adalah tinggi jika pekerjaan impor gagal.

    Selain itu, jumlah node dalam kluster dibatasi. Oleh karena itu, batas ditetapkan pada jumlah maksimum data yang akan diimpor, yaitu jumlah node dikalikan dengan 3 GB. Ini memastikan bahwa sumber daya sistem digunakan dengan benar. Jika Anda perlu mengimpor jumlah data yang besar, kami merekomendasikan Anda mengirimkan beberapa pekerjaan impor.

    ApsaraDB for SelectDB membatasi jumlah pekerjaan impor konkuren dalam kluster, yang berkisar antara 3 hingga 10. Jika jumlah pekerjaan impor yang Anda kirimkan lebih besar dari batas, pekerjaan impor tambahan menunggu dalam antrian. Panjang maksimum antrian adalah 100. Jika lebih dari 100 pekerjaan impor menunggu, pekerjaan impor tambahan langsung ditolak.

    Catatan

    Waktu tunggu juga dihitung dalam total periode waktu untuk pekerjaan impor. Jika terjadi kesalahan timeout, pekerjaan dibatalkan. Kami merekomendasikan Anda memantau status pekerjaan impor untuk mengontrol frekuensi pengiriman pekerjaan dengan tepat.