全部产品
Search
文档中心

ApsaraDB for SelectDB:Stream Load

更新时间:Nov 11, 2025

Anda dapat menggunakan Stream Load untuk mengimpor file lokal atau aliran data ke dalam instans ApsaraDB for SelectDB. Topik ini menjelaskan cara menggunakan Stream Load untuk mengimpor data ke ApsaraDB for SelectDB.

Informasi latar belakang

Stream Load adalah metode impor data sinkron. Anda dapat mengirim permintaan HTTP untuk mengimpor file lokal atau aliran data ke ApsaraDB for SelectDB. Stream Load segera mengembalikan hasil impor. Anda dapat memeriksa nilai kembali permintaan untuk menentukan apakah impor berhasil. Stream Load mendukung format data CSV (teks), JSON, PARQUET, dan ORC.

Penting

Stream Load menawarkan throughput tinggi, latensi rendah, serta fleksibel dan andal. Kami sangat merekomendasikan agar Anda menggunakan Stream Load sebagai metode impor data utama Anda.

Persiapan

  1. Pastikan terminal yang digunakan untuk mengirim permintaan Stream Load dapat terhubung ke instans SelectDB melalui jaringan:

    1. Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Ajukan dan lepas titik akhir publik.

      Jika terminal yang mengirim permintaan Stream Load berada dalam VPC yang sama dengan instans ApsaraDB for SelectDB, Anda dapat melewati langkah ini.

    2. Tambahkan alamat IP terminal yang mengirim permintaan Stream Load ke daftar putih instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih.

    3. Jika terminal yang mengirim permintaan Stream Load memiliki daftar putih yang dikonfigurasi, tambahkan rentang alamat IP instans SelectDB ke daftar putih tersebut.

  2. Opsi: Ubah konfigurasi kluster komputasi (backend) untuk mengaktifkan pencatatan operasi Stream Load.

    Secara default, kluster komputasi tidak mencatat operasi Stream Load.

    Untuk melacak operasi Stream Load, atur enable_stream_load_record ke true dan mulai ulang kluster sebelum membuat tugas impor. Untuk mengaktifkan fitur ini, Anda harus mengajukan tiket untuk dukungan teknis.

  3. Opsi: Ubah konfigurasi kluster komputasi untuk menyesuaikan ukuran impor maksimum untuk Stream Load.

    Ukuran maksimum file default yang dapat diimpor menggunakan Stream Load adalah 10.240 MB.

    Jika file sumber Anda melebihi ukuran ini, Anda dapat menyesuaikan parameter backend streaming_load_max_mb. Untuk informasi selengkapnya tentang cara mengubah parameter, lihat Konfigurasi parameter.

  4. Opsi: Ubah konfigurasi frontend (FE) untuk menyesuaikan waktu tunggu impor.

    Waktu tunggu default untuk tugas Stream Load adalah 600 detik. Jika tugas impor tidak selesai dalam batas waktu yang ditentukan, sistem akan membatalkan tugas tersebut dan mengubah statusnya menjadi CANCELLED.

    Jika file sumber tidak dapat diimpor dalam batas waktu tersebut, Anda dapat menetapkan waktu tunggu tertentu dalam permintaan Stream Load atau menyesuaikan parameter FE stream_load_default_timeout_second dan memulai ulang instans untuk menetapkan waktu tunggu default global. Untuk melakukan penyesuaian ini, Anda harus mengajukan tiket untuk dukungan teknis.

Catatan penggunaan

Stream Load tunggal dapat menulis data sebesar beberapa ratus MB hingga 1 GB. Dalam beberapa skenario bisnis, sering menulis data dalam jumlah kecil dapat menurunkan kinerja instans secara signifikan dan bahkan menyebabkan deadlock tabel. Kami sangat merekomendasikan agar Anda mengurangi frekuensi penulisan dan menggunakan pemrosesan batch untuk data Anda.

  • Pemrosesan batch di sisi aplikasi: Kumpulkan data bisnis di sisi aplikasi, lalu kirim permintaan Stream Load ke SelectDB.

  • Pemrosesan batch di sisi server: Setelah SelectDB menerima permintaan Stream Load, server melakukan pemrosesan batch pada data tersebut. Untuk informasi selengkapnya, lihat Group Commit.

Buat tugas impor

Stream Load mengirimkan dan mentransfer data melalui protokol HTTP. Contoh berikut menunjukkan cara mengirimkan tugas impor menggunakan perintah curl. Anda dapat menjalankan perintah ini di terminal Linux atau macOS, atau di command prompt Windows. Anda juga dapat menggunakan klien HTTP lain untuk operasi Stream Load.

Sintaks

curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

Parameter

Parameter

Wajib

Deskripsi

--location-trusted

Ya

Jika autentikasi diperlukan, ini meneruskan username dan password ke server tempat permintaan dialihkan.

-u

Ya

Tentukan nama pengguna dan kata sandi untuk instans SelectDB.

  • username: Nama pengguna.

  • password: Kata sandi.

-H

Tidak

Tentukan konten header permintaan (Header) untuk permintaan impor Stream Load ini. Formatnya sebagai berikut:

-H "key1:value1"

Parameter umum sebagai berikut:

  • label: ID unik tugas impor.

  • column_separator: Menentukan pemisah kolom dalam file impor. Defaultnya adalah \t.

    Anda juga dapat menggunakan kombinasi beberapa karakter sebagai pemisah kolom.

    Untuk karakter tak terlihat, Anda perlu menambahkan \x sebagai awalan dan menggunakan heksadesimal untuk merepresentasikan pemisah.

Untuk informasi selengkapnya tentang parameter header permintaan, lihat Parameter header permintaan.

-T

Ya

Tentukan jalur file data yang akan diimpor.

file_path: Jalur file objek.

-XPUT

Ya

Metode permintaan HTTP. Gunakan metode permintaan PUT untuk menentukan alamat impor data SelectDB. Parameter spesifiknya sebagai berikut:

  • host: Titik akhir VPC atau titik akhir publik instans SelectDB.

    • Gunakan titik akhir publik jika tidak dalam VPC yang sama: Jika perangkat tempat Anda menjalankan perintah tidak berada dalam VPC yang sama dengan instans SelectDB target, Anda harus menggunakan titik akhir publik. Untuk mengajukan titik akhir publik, lihat Ajukan dan lepas titik akhir publik.

    • Gunakan titik akhir VPC jika dalam VPC yang sama: Jika perangkat tempat Anda menjalankan perintah adalah produk Alibaba Cloud dan berada dalam VPC yang sama dengan instans SelectDB target, kami merekomendasikan agar Anda menggunakan titik akhir VPC.

  • port: Nomor port HTTP instans SelectDB. Defaultnya adalah 8080.

    Anda dapat melihat titik akhir dan nomor port instans di halaman produk instans di SelectDB.

  • db_name: Nama database.

  • table_name: Nama tabel.

Parameter header permintaan

Stream Load menggunakan protokol HTTP. Oleh karena itu, parameter yang terkait dengan tugas impor diatur dalam header permintaan. Tabel berikut menjelaskan parameter impor umum.

Parameter

Deskripsi

label

ID unik tugas impor.

Label memiliki beberapa tujuan:

  • Nama kustom dalam perintah impor.

  • Dapat digunakan untuk melihat status eksekusi tugas impor yang sesuai.

  • Dapat digunakan untuk mencegah impor data yang sama berulang kali.

  • Dapat digunakan kembali jika status pekerjaan impor yang sesuai adalah CANCELLED.

Penting

Kami merekomendasikan menggunakan Label yang sama untuk batch data yang sama. Dengan cara ini, permintaan berulang untuk batch data yang sama hanya akan diterima sekali, memastikan pengiriman At-Most-Once.

format

Menentukan format data impor.

  • Format yang didukung: CSV, JSON, PARQUET, ORC, csv_with_names (melewatkan baris pertama file CSV), dan csv_with_names_and_types (melewatkan dua baris pertama file CSV).

  • Format default: CSV.

Untuk informasi selengkapnya tentang persyaratan format file dan parameter terkait, lihat Format File.

line_delimiter

Menentukan pemisah baris dalam file impor.

Anda juga dapat menggunakan kombinasi beberapa karakter sebagai pemisah baris. Misalnya, di sistem Windows, gunakan \r\n sebagai pemisah baris.

column_separator

Menentukan pemisah kolom dalam file impor.

Anda juga dapat menggunakan kombinasi beberapa karakter sebagai pemisah kolom. Misalnya, Anda dapat menggunakan garis vertikal ganda || sebagai pemisah kolom.

Untuk karakter tak terlihat, Anda perlu menambahkan \x sebagai awalan dan menggunakan heksadesimal untuk merepresentasikan pemisah. Misalnya, untuk pemisah file Hive \x01, Anda perlu menentukan -H"column_separator:\x01".

compress_type

Menentukan format kompresi file. Kompresi hanya didukung untuk file CSV dan JSON.

Format kompresi yang didukung: gz, lzo, bz2, lz4, lzop, dan deflate.

max_filter_ratio

Menentukan toleransi kesalahan maksimum untuk tugas impor.

Jika tingkat kesalahan impor melebihi ambang batas ini, impor akan gagal. Untuk mengabaikan baris yang salah, Anda harus mengatur parameter ini ke nilai lebih besar dari 0 agar impor berhasil.

  • Nilai default: 0, yang berarti toleransi nol.

  • Rentang nilai: [0, 1].

strict_mode

Menentukan apakah akan mengaktifkan mode ketat.

  • false (default): Tidak mengaktifkan mode ketat.

  • true: Mengaktifkan mode ketat. Saat diaktifkan, penyaringan ketat diterapkan pada konversi tipe kolom selama proses impor.

    • Data yang salah akan disaring.

    • Jika konversi tipe kolom dari data sumber non-null menghasilkan nilai NULL, data tersebut juga akan disaring.

cloud_cluster

Menentukan kluster yang akan digunakan untuk impor.

Secara default, kluster default instans digunakan. Jika instans tidak memiliki kluster default yang ditetapkan, kluster yang Anda miliki izinnya akan dipilih secara otomatis.

load_to_single_tablet

Menentukan apakah akan mengimpor data hanya ke satu tablet partisi yang sesuai. Parameter ini hanya dapat diatur saat mengimpor data ke tabel Duplicate Key dengan bucketing acak.

  • false (default): Saat mengimpor data ke tabel model Duplicate Key dengan bucketing acak, data tidak ditulis hanya ke satu bucket partisi yang sesuai.

  • true: Saat mengimpor data ke tabel model Duplicate Key dengan bucketing acak, data ditulis hanya ke satu bucket partisi yang sesuai. Hal ini dapat meningkatkan konkurensi dan throughput impor data.

where

Menentukan kondisi filter untuk tugas impor.

Anda dapat menentukan pernyataan where untuk menyaring data sumber. Data yang disaring tidak akan diimpor atau dimasukkan dalam perhitungan rasio filter. Namun, data tersebut akan dihitung dalam jumlah baris yang disaring oleh kondisi where,

num_rows_unselected.

partitions

Menentukan informasi partisi untuk data yang akan diimpor.

Jika data yang akan diimpor tidak termasuk dalam partisi yang ditentukan, data tersebut tidak akan diimpor. Baris data ini dihitung dalam dpp.abnorm.ALL.

dpp.abnorm.ALL adalah metrik penghitung di SelectDB. Ini merepresentasikan jumlah total baris yang disaring selama tahap pra-pemrosesan data. NumberFilteredRows dalam hasil impor mencakup jumlah baris abnormal yang dihitung oleh dpp.abnorm.ALL.

columns

Menentukan konfigurasi transformasi fungsi untuk data yang akan diimpor.

Metode transformasi fungsi yang didukung mencakup perubahan urutan kolom dan transformasi ekspresi. Metode untuk transformasi ekspresi sama seperti dalam pernyataan pencarian.

merge_type

Menentukan jenis penggabungan data.

  • APPEND (default): Menunjukkan bahwa impor ini adalah operasi penulisan tambahan normal.

  • MERGE: Harus digunakan bersama parameter delete untuk menandai kolom Delete Flag.

  • DELETE: Menunjukkan bahwa semua data dalam impor ini adalah operasi penghapusan.

Penting

MERGE dan DELETE hanya berlaku untuk model Unique Key.

delete

Ini hanya berarti ketika merge_type diatur ke MERGE. Ini menentukan kondisi untuk menghapus data.

function_column.sequence_col

Ini hanya berlaku untuk model Unique Key. Untuk kolom kunci yang sama, ini memastikan bahwa kolom nilai diganti sesuai dengan kolom source_sequence. Kolom source_sequence dapat berupa kolom dari sumber data atau kolom dalam skema tabel.

exec_mem_limit

Menentukan batas memori impor.

  • Satuan: byte.

  • Nilai default: 2147483648, yaitu 2 GiB.

timeout

Menentukan waktu tunggu impor.

  • Satuan: detik.

  • Nilai default: 600.

  • Rentang: [1, 259200].

timezone

Menentukan zona waktu yang digunakan untuk impor ini. Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor. Untuk informasi selengkapnya tentang zona waktu, lihat IANA Time Zone Database.

Nilai default: Asia/Shanghai, yaitu UTC+8.

two_phase_commit

Menentukan apakah akan mengaktifkan mode komit dua fase.

  • false (default): Tidak mengaktifkan komit dua fase.

  • true: Mengaktifkan komit dua fase. Saat diaktifkan, informasi dikembalikan kepada pengguna segera setelah penulisan data selesai. Pada titik ini, data belum terlihat, dan status transaksi adalah PRECOMMITTED. Data menjadi terlihat hanya setelah pengguna secara manual memicu operasi komit.

  • Kapan mengaktifkan atau menonaktifkan:

    Saat diaktifkan, impor data bersifat atomik. Impor tersebut berhasil sepenuhnya atau gagal sepenuhnya. Ini juga mencegah situasi di mana sebagian data menjadi terlihat selama proses impor.

    Aktifkan dalam skenario berikut:

    • Data transaksi keuangan: Memerlukan jaminan ketat terhadap integritas dan konsistensi data.

    • Data sistem penagihan: Impor data parsial tidak diperbolehkan.

    • Data bisnis kritis: Skenario dengan persyaratan akurasi data yang sangat tinggi.

    Nonaktifkan dalam skenario berikut:

    • Analisis log: Persyaratan konsistensi tidak tinggi, dan kecepatan impor menjadi prioritas.

    • Pemrosesan batch skala besar: Sumber daya terbatas, dan impor perlu diselesaikan dengan cepat.

    • Data yang dapat diimpor ulang: Data yang dapat diimpor kembali apabila proses impor gagal.

jsonpaths

Ada dua cara untuk mengimpor data dalam format JSON:

  • Mode dasar: Anda tidak perlu menentukan jsonpaths. Kunci dalam data JSON harus berkorespondensi satu-satu dengan nama kolom di tabel, tetapi urutannya dapat berbeda. Misalnya, dalam data JSON {"k1":1, "k2":2, "k3":"hello"}, k1, k2, dan k3 berkorespondensi dengan nama kolom di tabel.

  • Mode pencocokan: Saat data JSON kompleks, gunakan parameter jsonpaths untuk mencocokkan kunci ke kolom tabel yang sesuai. Misalnya, jsonpaths:["$.status", "$.res.id", "$.res.count"] dapat mengekstrak field bersarang dari data JSON dan menuliskannya ke tabel yang sesuai. Secara default, field yang diekstrak oleh jsonpaths dipetakan ke kolom tabel secara berurutan.

json_root

Parameter json_root dapat digunakan untuk menentukan objek anak dalam data JSON sebagai node akar untuk penguraian.

Nilai default adalah "", yang berarti seluruh objek JSON dipilih sebagai node akar.

read_json_by_line

Parameter penting dalam Stream Load untuk memproses data JSON. Parameter ini mengontrol cara mengurai file input yang berisi beberapa baris data JSON.

  • false (default): Seluruh file input diperlakukan sebagai satu nilai JSON atau array. Kernel mencoba mengurai seluruh konten file sebagai satu objek JSON atau array.

    Misalnya, jika file berisi konten berikut:

    [
     {"id":1, "name":"Alice", "age":25},
     {"id":2, "name":"Bob", "age":30},
     {"id":3, "name":"Charlie", "age":35}
    ]

    Seluruh konten file diurai sebagai satu array JSON.

  • true: Setiap baris data yang diimpor adalah objek JSON.

    Misalnya, jika file berisi konten berikut:

    {"id":1, "name":"Alice", "age":25}
    {"id":2, "name":"Bob", "age":30}
    {"id":3, "name":"Charlie", "age":35}

    Setiap baris dalam file diurai sebagai objek JSON.

strip_outer_array

Parameter penting dalam Stream Load untuk memproses data JSON yang berisi array luar.

  • false (default): Struktur asli data JSON dipertahankan. Array luar tidak dihapus. Seluruh array JSON diimpor sebagai satu catatan.

    Misalnya, untuk data sampel [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jika strip_outer_array diatur ke false, data tersebut diurai sebagai satu array dan diimpor ke tabel.

  • true: Saat data yang diimpor adalah array JSON, Anda harus mengatur strip_outer_array ke true.

    Misalnya, untuk data sampel [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], jika strip_outer_array diatur ke true, data tersebut diurai sebagai dua catatan data dan diimpor ke tabel.

Penting

Saat Anda mengimpor data dalam format JSON, kinerja untuk format non-array jauh lebih tinggi daripada format array.

Contoh

Contoh ini menunjukkan cara mengimpor file CSV data.csv ke tabel test_table dalam database test_db. Titik akhir VPC instans adalah selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com. Ini hanya contoh perintah curl. Untuk contoh lengkap, lihat Contoh impor data lengkap.

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Nilai kembali

Stream Load adalah metode impor sinkron. Hasil impor dikembalikan langsung dalam tanggapan terhadap permintaan pembuatan. Blok kode berikut menunjukkan contoh nilai kembali.

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

Tabel berikut menjelaskan parameter dalam nilai kembali.

Parameter

Deskripsi

TxnId

ID transaksi impor.

Label

ID impor.

Anda dapat menentukan ID kustom atau membiarkan sistem menghasilkannya.

Status

Status impor. Nilai yang valid adalah:

  • Success: Impor berhasil.

  • Publish Timeout: Tugas impor selesai, tetapi data mungkin terlihat setelah penundaan. Jangan coba ulang.

  • Label Already Exists: Label duplikat. Anda harus mengubah Label.

  • Fail: Impor gagal.

ExistingJobStatus

Status pekerjaan impor yang sesuai dengan Label yang sudah ada.

Field ini hanya ditampilkan ketika Status adalah Label Already Exists.

Anda dapat menggunakan status ini untuk menentukan keadaan tugas impor yang sesuai dengan Label yang sudah ada.

  • RUNNING: Tugas masih berjalan.

  • FINISHED: Tugas berhasil.

Message

Pesan kesalahan.

NumberTotalRows

Jumlah total baris yang diproses.

NumberLoadedRows

Jumlah baris yang berhasil diimpor.

NumberFilteredRows

Jumlah baris dengan kualitas data yang tidak memenuhi syarat.

NumberUnselectedRows

Jumlah baris yang disaring oleh kondisi where.

LoadBytes

Jumlah byte yang diimpor.

LoadTimeMs

Waktu yang dibutuhkan untuk impor.

Satuan: milidetik.

BeginTxnTimeMs

Waktu yang dihabiskan untuk meminta FE memulai transaksi.

Satuan: milidetik.

StreamLoadPutTimeMs

Waktu yang dihabiskan untuk meminta FE mendapatkan rencana eksekusi data impor.

Satuan: milidetik.

ReadDataTimeMs

Waktu yang dihabiskan untuk membaca data.

Satuan: milidetik.

WriteDataTimeMs

Waktu yang dihabiskan untuk melakukan operasi penulisan data.

Satuan: milidetik.

CommitAndPublishTimeMs

Waktu yang dihabiskan untuk meminta FE melakukan komit dan publikasi transaksi.

Satuan: milidetik.

ErrorURL

Jika ada masalah kualitas data, Anda dapat mengakses URL ini untuk melihat baris kesalahan spesifik.

Membatalkan tugas impor

Anda tidak dapat membatalkan tugas Stream Load secara manual setelah dibuat. Sistem hanya akan membatalkan tugas secara otomatis jika terjadi waktu tunggu habis atau dilaporkan kesalahan impor. Anda dapat menggunakan errorUrl dari nilai kembali untuk mengunduh pesan kesalahan dan memecahkan masalah tersebut.

Menampilkan tugas Stream Load

Jika Anda telah mengaktifkan pencatatan operasi Stream Load, Anda dapat menghubungkan ke instans ApsaraDB for SelectDB menggunakan klien MySQL dan menjalankan pernyataan show stream load untuk melihat tugas Stream Load yang telah selesai.

Contoh impor data lengkap

Persiapan

Sebelum memulai operasi impor, selesaikan persiapan.

Impor data CSV

Contoh: Impor menggunakan skrip

  1. Buat tabel tujuan untuk data.

    1. Hubungkan ke instans SelectDB. Untuk informasi selengkapnya, lihat Hubungkan ke instans ApsaraDB for SelectDB menggunakan DMS.

    2. Jalankan pernyataan berikut untuk membuat database.

      CREATE DATABASE test_db;
    3. Jalankan pernyataan berikut untuk membuat tabel.

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int,
          address varchar(50),
          url varchar(500)
      )
      UNIQUE KEY(`id`, `name`)
      DISTRIBUTED BY HASH(id) BUCKETS 16
      PROPERTIES("replication_num" = "1");
  2. Di perangkat tempat terminal Stream Load berada, buat file bernama test.csv yang akan diimpor.

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. Impor data.

    Buka terminal di perangkat target dan jalankan perintah curl untuk memulai tugas Stream Load dan mengimpor data.

    Untuk sintaks dan deskripsi parameter pembuatan tugas impor, lihat Buat tugas impor. Contoh berikut menunjukkan skenario impor umum.

    • Gunakan Label untuk menghapus duplikat dan menentukan waktu tunggu.

      Impor data dari file test.csv ke tabel test_table di database test_db. Gunakan Label untuk menghindari impor batch data duplikat, dan atur waktu tunggu menjadi 100 detik.

       curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Gunakan Label untuk menghapus duplikat dan gunakan kolom untuk menyaring data dari file.

      Impor data dari file test.csv ke tabel test_table di database test_db. Gunakan Label untuk menghindari impor batch data duplikat, tentukan nama kolom dari file, dan impor hanya data di mana kolom 'address' adalah 'hangzhou'.

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Izinkan toleransi kesalahan 20%.

      Impor data dari file test.csv ke tabel test_table di database test_db, memungkinkan tingkat kesalahan 20%.

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Gunakan mode ketat dan atur zona waktu.

      Saring data yang diimpor menggunakan mode ketat dan atur zona waktu ke Africa/Abidjan.

      curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Hapus data di SelectDB.

      Hapus data di SelectDB yang identik dengan data dalam file test.csv.

      curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Hapus data yang tidak diinginkan dari file berdasarkan kondisi dan impor data yang tersisa ke SelectDB.

      Hapus baris dari file test.csv di mana kolom address adalah 'hangzhou', dan impor baris yang tersisa ke SelectDB.

      curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

Contoh: Impor menggunakan kode Java

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. Konfigurasi parameter.
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. Bangun httpclient. Perhatikan bahwa pengalihan (isRedirectable) harus diaktifkan.
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // Aktifkan pengalihan.
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. Bangun objek permintaan httpPut.
        HttpPut httpPut = new HttpPut(loadUrl);

        // Atur httpHeader...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. Atur data yang akan dikirim. Di sini, kita menulis CSV.
        // Asumsikan ada tabel dengan field berikut:
        // field1,field2,field3,field4
        // Ini mensimulasikan tiga catatan CSV. Di Doris, pemisah baris default untuk CSV adalah \n, dan pemisah kolom adalah \t.
        // String data =
        //        "1\t2\t3\t4\n" +
        //        "11\t22\t33\t44\n" +
        //        "111\t222\t333\t444";
        // Baca semua baris.
         List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        // Gabungkan semua baris dengan \n.
        String data = String.join("\n", lines);
        
        httpPut.setEntity(new StringEntity(data));

        // 5. Kirim permintaan dan proses hasilnya.
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // Pilih komponen serialisasi JSON yang sesuai untuk meng-serialisasi nilai kembali.
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // Dapatkan kode status yang dikembalikan oleh SelectDB...
            String dorisStatus = respAsMap.get("Status");
            // Jika SelectDB mengembalikan salah satu status berikut, artinya data berhasil ditulis.
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad gagal, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("berhasil....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

Impor data JSON

  1. Buat tabel tujuan untuk data.

    1. Hubungkan ke instans SelectDB. Untuk informasi selengkapnya, lihat Hubungkan ke instans ApsaraDB for SelectDB menggunakan DMS.

    2. Jalankan pernyataan berikut untuk membuat database.

      CREATE DATABASE test_db;
    3. Jalankan pernyataan berikut untuk membuat tabel.

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int
      )
      UNIQUE KEY(`id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 16
      PROPERTIES("replication_num" = "1");

  2. Impor data.

    Penting

    Saat Anda mengimpor data dalam format JSON, kinerja untuk format non-array jauh lebih tinggi daripada format array.

    Impor data dalam format non-array

    1. Di terminal Stream Load, buat file bernama json.data. File harus berisi beberapa baris, dengan satu catatan JSON per baris. Berikut adalah konten contohnya:

      {"id":1,"name":"Emily","age":25}
      {"id":2,"name":"Benjamin","age":35}
      {"id":3,"name":"Olivia","age":28}
      {"id":4,"name":"Alexander","age":60}
      {"id":5,"name":"Ava","age":17}
    2. Impor data.

      Buka terminal dan jalankan perintah curl untuk memulai tugas Stream Load. Tugas ini mengimpor data dari file json.data ke tabel test_table di database test_db.

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

    Impor data dalam format array

    1. Di terminal Stream Load, buat file data dalam format array JSON bernama json_array.data.

      [
      {"userid":1,"username":"Emily","userage":25},
      {"userid":2,"username":"Benjamin","userage":35},
      {"userid":3,"username":"Olivia","userage":28},
      {"userid":4,"username":"Alexander","userage":60},
      {"userid":5,"username":"Ava","userage":17}
      ]
    2. Impor data.

      Buka terminal dan jalankan perintah curl untuk memulai tugas Stream Load. Tugas ini mengimpor data dari file lokal json_array.data ke tabel test_table di database test_db.

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Mode HTTP Stream

Dalam Stream Load, Anda dapat menggunakan fitur Table Value Function (TVF) untuk menentukan parameter impor dengan ekspresi SQL. Fitur Stream Load ini disebut http_stream. Untuk informasi selengkapnya tentang cara menggunakan TVF, lihat TVF.

URL REST API untuk impor http_stream berbeda dari URL untuk impor Stream Load normal.

  • URL Stream Load normal: http://host:http_port/api/{db}/{table}/_stream_load.

  • URL menggunakan TVF http_stream: http://host:http_port/api/_http_stream .

Sintaks

Berikut adalah sintaks untuk mode HTTP Stream Stream Load.

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

Untuk deskripsi parameter HTTP Stream, lihat Parameter.

Contoh

Anda dapat menambahkan parameter SQL bernama load_sql ke header HTTP untuk menggantikan parameter seperti column_separator, line_delimiter, where, dan columns. Blok kode berikut menunjukkan contoh parameter SQL load_sql.

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

Contoh lengkap:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream

FAQ

P1: Apa yang harus saya lakukan jika muncul kesalahan "get table cloud commit lock timeout" selama impor?

Kesalahan ini menunjukkan bahwa penulisan data terlalu sering telah menyebabkan deadlock tabel. Kami sangat merekomendasikan agar Anda mengurangi frekuensi penulisan dan menggunakan pemrosesan batch untuk data Anda. Stream Load tunggal dapat menulis data sebesar beberapa ratus MB hingga 1 GB.

P2: Saat mengimpor file CSV, bagaimana cara menangani data yang berisi pemisah kolom atau baris?

Anda harus menentukan pemisah kolom dan baris baru serta memodifikasi data impor untuk memastikan bahwa data tidak bertentangan dengan pemisah tersebut. Hal ini memungkinkan data diurai dengan benar. Bagian berikut memberikan contoh:

Data berisi pemisah baris

Jika data yang diimpor berisi pemisah baris yang ditentukan, seperti pemisah baris default \n, Anda harus menentukan pemisah baris baru.

Misalnya, asumsikan file data Anda berisi konten berikut:

Zhang San\n,25,Shaanxi
Li Si\n,30,Beijing

Dalam skenario ini, \n dalam file adalah data, bukan pemisah baris. Namun, pemisah baris default juga \n. Untuk memastikan file diurai dengan benar, Anda harus menggunakan parameter line_delimiter untuk menentukan pemisah baris baru dan secara eksplisit menambahkan pemisah baru tersebut di akhir setiap baris data dalam file. Berikut adalah contohnya:

  1. Atur pemisah baris untuk impor.

    Misalnya, untuk mengganti pemisah baris default \n dengan \r\n, Anda harus mengatur -H "line_delimiter:\r\n" saat mengimpor data.

  2. Tambahkan pemisah baris yang ditentukan di akhir setiap baris data. Teks contoh harus dimodifikasi sebagai berikut:

    Zhang San\n,25,Shaanxi\r\n
    Li Si\n,30,Beijing\r\n

Data berisi pemisah kolom

Jika data yang diimpor berisi pemisah kolom yang ditentukan, seperti pemisah kolom default \t, Anda harus menentukan pemisah kolom baru.

Misalnya, asumsikan file data Anda berisi konten berikut:

Zhang San\t  25  Shaanxi
Li Si\t  30  Beijing

Dalam skenario ini, \t dalam file adalah data, bukan pemisah kolom. Namun, pemisah kolom default juga \t (karakter tab). Untuk memastikan file diurai dengan benar, Anda harus menggunakan parameter column_separator untuk menentukan pemisah kolom baru dan secara eksplisit menambahkan pemisah baru tersebut di antara kolom dalam file. Berikut adalah contohnya:

  1. Atur pemisah kolom untuk impor.

    Misalnya, untuk mengganti pemisah kolom default \t dengan koma (,), Anda harus mengatur -H "column_separator:," saat mengimpor data.

  2. Tambahkan pemisah kolom yang ditentukan di antara kolom data. Teks contoh harus dimodifikasi sebagai berikut:

    Zhang San\t,25,Shaanxi
    Li Si\t,30,Beijing