Anda dapat menggunakan Stream Load untuk mengimpor file lokal atau aliran data ke dalam instans ApsaraDB for SelectDB. Topik ini menjelaskan cara menggunakan Stream Load untuk mengimpor data ke ApsaraDB for SelectDB.
Informasi latar belakang
Stream Load adalah metode impor data sinkron. Anda dapat mengirim permintaan HTTP untuk mengimpor file lokal atau aliran data ke ApsaraDB for SelectDB. Stream Load segera mengembalikan hasil impor. Anda dapat memeriksa nilai kembali permintaan untuk menentukan apakah impor berhasil. Stream Load mendukung format data CSV (teks), JSON, PARQUET, dan ORC.
Stream Load menawarkan throughput tinggi, latensi rendah, serta fleksibel dan andal. Kami sangat merekomendasikan agar Anda menggunakan Stream Load sebagai metode impor data utama Anda.
Persiapan
Pastikan terminal yang digunakan untuk mengirim permintaan Stream Load dapat terhubung ke instans SelectDB melalui jaringan:
Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Ajukan dan lepas titik akhir publik.
Jika terminal yang mengirim permintaan Stream Load berada dalam VPC yang sama dengan instans ApsaraDB for SelectDB, Anda dapat melewati langkah ini.
Tambahkan alamat IP terminal yang mengirim permintaan Stream Load ke daftar putih instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih.
Jika terminal yang mengirim permintaan Stream Load memiliki daftar putih yang dikonfigurasi, tambahkan rentang alamat IP instans SelectDB ke daftar putih tersebut.
Untuk mendapatkan alamat IP instans SelectDB di VPC tempat instans SelectDB berada, Anda dapat melakukan operasi yang disediakan di Bagaimana cara melihat alamat IP di VPC tempat instans ApsaraDB SelectDB saya berada?
Untuk mendapatkan alamat IP publik instans SelectDB, Anda dapat menjalankan perintah ping untuk mengakses titik akhir publik instans SelectDB dan mendapatkan alamat IP instans tersebut.
Opsi: Ubah konfigurasi kluster komputasi (backend) untuk mengaktifkan pencatatan operasi Stream Load.
Secara default, kluster komputasi tidak mencatat operasi Stream Load.
Untuk melacak operasi Stream Load, atur enable_stream_load_record ke true dan mulai ulang kluster sebelum membuat tugas impor. Untuk mengaktifkan fitur ini, Anda harus mengajukan tiket untuk dukungan teknis.
Opsi: Ubah konfigurasi kluster komputasi untuk menyesuaikan ukuran impor maksimum untuk Stream Load.
Ukuran maksimum file default yang dapat diimpor menggunakan Stream Load adalah 10.240 MB.
Jika file sumber Anda melebihi ukuran ini, Anda dapat menyesuaikan parameter backend streaming_load_max_mb. Untuk informasi selengkapnya tentang cara mengubah parameter, lihat Konfigurasi parameter.
Opsi: Ubah konfigurasi frontend (FE) untuk menyesuaikan waktu tunggu impor.
Waktu tunggu default untuk tugas Stream Load adalah 600 detik. Jika tugas impor tidak selesai dalam batas waktu yang ditentukan, sistem akan membatalkan tugas tersebut dan mengubah statusnya menjadi CANCELLED.
Jika file sumber tidak dapat diimpor dalam batas waktu tersebut, Anda dapat menetapkan waktu tunggu tertentu dalam permintaan Stream Load atau menyesuaikan parameter FE stream_load_default_timeout_second dan memulai ulang instans untuk menetapkan waktu tunggu default global. Untuk melakukan penyesuaian ini, Anda harus mengajukan tiket untuk dukungan teknis.
Catatan penggunaan
Stream Load tunggal dapat menulis data sebesar beberapa ratus MB hingga 1 GB. Dalam beberapa skenario bisnis, sering menulis data dalam jumlah kecil dapat menurunkan kinerja instans secara signifikan dan bahkan menyebabkan deadlock tabel. Kami sangat merekomendasikan agar Anda mengurangi frekuensi penulisan dan menggunakan pemrosesan batch untuk data Anda.
Pemrosesan batch di sisi aplikasi: Kumpulkan data bisnis di sisi aplikasi, lalu kirim permintaan Stream Load ke SelectDB.
Pemrosesan batch di sisi server: Setelah SelectDB menerima permintaan Stream Load, server melakukan pemrosesan batch pada data tersebut. Untuk informasi selengkapnya, lihat Group Commit.
Buat tugas impor
Stream Load mengirimkan dan mentransfer data melalui protokol HTTP. Contoh berikut menunjukkan cara mengirimkan tugas impor menggunakan perintah curl. Anda dapat menjalankan perintah ini di terminal Linux atau macOS, atau di command prompt Windows. Anda juga dapat menggunakan klien HTTP lain untuk operasi Stream Load.
Sintaks
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_loadParameter
Parameter | Wajib | Deskripsi |
| Ya | Jika autentikasi diperlukan, ini meneruskan |
| Ya | Tentukan nama pengguna dan kata sandi untuk instans SelectDB.
|
| Tidak | Tentukan konten header permintaan (Header) untuk permintaan impor Stream Load ini. Formatnya sebagai berikut:
Parameter umum sebagai berikut:
Untuk informasi selengkapnya tentang parameter header permintaan, lihat Parameter header permintaan. |
| Ya | Tentukan jalur file data yang akan diimpor. file_path: Jalur file objek. |
| Ya | Metode permintaan HTTP. Gunakan metode permintaan PUT untuk menentukan alamat impor data SelectDB. Parameter spesifiknya sebagai berikut:
|
Parameter header permintaan
Stream Load menggunakan protokol HTTP. Oleh karena itu, parameter yang terkait dengan tugas impor diatur dalam header permintaan. Tabel berikut menjelaskan parameter impor umum.
Parameter | Deskripsi |
| ID unik tugas impor.
Penting Kami merekomendasikan menggunakan |
| Menentukan format data impor.
Untuk informasi selengkapnya tentang persyaratan format file dan parameter terkait, lihat Format File. |
| Menentukan pemisah baris dalam file impor. Anda juga dapat menggunakan kombinasi beberapa karakter sebagai pemisah baris. Misalnya, di sistem Windows, gunakan \r\n sebagai pemisah baris.
|
| Menentukan pemisah kolom dalam file impor. Anda juga dapat menggunakan kombinasi beberapa karakter sebagai pemisah kolom. Misalnya, Anda dapat menggunakan garis vertikal ganda Untuk karakter tak terlihat, Anda perlu menambahkan
|
| Menentukan format kompresi file. Kompresi hanya didukung untuk file Format kompresi yang didukung: |
| Menentukan toleransi kesalahan maksimum untuk tugas impor. Jika tingkat kesalahan impor melebihi ambang batas ini, impor akan gagal. Untuk mengabaikan baris yang salah, Anda harus mengatur parameter ini ke nilai lebih besar dari 0 agar impor berhasil.
|
| Menentukan apakah akan mengaktifkan mode ketat.
|
| Menentukan kluster yang akan digunakan untuk impor. Secara default, kluster default instans digunakan. Jika instans tidak memiliki kluster default yang ditetapkan, kluster yang Anda miliki izinnya akan dipilih secara otomatis. |
| Menentukan apakah akan mengimpor data hanya ke satu tablet partisi yang sesuai. Parameter ini hanya dapat diatur saat mengimpor data ke tabel Duplicate Key dengan bucketing acak.
|
| Menentukan kondisi filter untuk tugas impor. Anda dapat menentukan pernyataan
|
| Menentukan informasi partisi untuk data yang akan diimpor. Jika data yang akan diimpor tidak termasuk dalam partisi yang ditentukan, data tersebut tidak akan diimpor. Baris data ini dihitung dalam dpp.abnorm.ALL.
|
| Menentukan konfigurasi transformasi fungsi untuk data yang akan diimpor. Metode transformasi fungsi yang didukung mencakup perubahan urutan kolom dan transformasi ekspresi. Metode untuk transformasi ekspresi sama seperti dalam pernyataan pencarian. |
| Menentukan jenis penggabungan data.
Penting
|
| Ini hanya berarti ketika |
| Ini hanya berlaku untuk model Unique Key. Untuk kolom kunci yang sama, ini memastikan bahwa kolom nilai diganti sesuai dengan kolom source_sequence. Kolom source_sequence dapat berupa kolom dari sumber data atau kolom dalam skema tabel. |
| Menentukan batas memori impor.
|
| Menentukan waktu tunggu impor.
|
| Menentukan zona waktu yang digunakan untuk impor ini. Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam impor. Untuk informasi selengkapnya tentang zona waktu, lihat IANA Time Zone Database. Nilai default: |
| Menentukan apakah akan mengaktifkan mode komit dua fase.
|
jsonpaths | Ada dua cara untuk mengimpor data dalam format JSON:
|
json_root | Parameter Nilai default adalah "", yang berarti seluruh objek JSON dipilih sebagai node akar. |
read_json_by_line | Parameter penting dalam Stream Load untuk memproses data JSON. Parameter ini mengontrol cara mengurai file input yang berisi beberapa baris data JSON.
|
strip_outer_array | Parameter penting dalam Stream Load untuk memproses data JSON yang berisi array luar.
Penting Saat Anda mengimpor data dalam format JSON, kinerja untuk format non-array jauh lebih tinggi daripada format array. |
Contoh
Contoh ini menunjukkan cara mengimpor file CSV data.csv ke tabel test_table dalam database test_db. Titik akhir VPC instans adalah selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com. Ini hanya contoh perintah curl. Untuk contoh lengkap, lihat Contoh impor data lengkap.
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadNilai kembali
Stream Load adalah metode impor sinkron. Hasil impor dikembalikan langsung dalam tanggapan terhadap permintaan pembuatan. Blok kode berikut menunjukkan contoh nilai kembali.
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}Tabel berikut menjelaskan parameter dalam nilai kembali.
Parameter | Deskripsi |
| ID transaksi impor. |
| ID impor. Anda dapat menentukan ID kustom atau membiarkan sistem menghasilkannya. |
| Status impor. Nilai yang valid adalah:
|
| Status pekerjaan impor yang sesuai dengan Field ini hanya ditampilkan ketika Status adalah Anda dapat menggunakan status ini untuk menentukan keadaan tugas impor yang sesuai dengan Label yang sudah ada.
|
| Pesan kesalahan. |
| Jumlah total baris yang diproses. |
| Jumlah baris yang berhasil diimpor. |
| Jumlah baris dengan kualitas data yang tidak memenuhi syarat. |
| Jumlah baris yang disaring oleh kondisi |
| Jumlah byte yang diimpor. |
| Waktu yang dibutuhkan untuk impor. Satuan: milidetik. |
| Waktu yang dihabiskan untuk meminta FE memulai transaksi. Satuan: milidetik. |
| Waktu yang dihabiskan untuk meminta FE mendapatkan rencana eksekusi data impor. Satuan: milidetik. |
| Waktu yang dihabiskan untuk membaca data. Satuan: milidetik. |
| Waktu yang dihabiskan untuk melakukan operasi penulisan data. Satuan: milidetik. |
| Waktu yang dihabiskan untuk meminta FE melakukan komit dan publikasi transaksi. Satuan: milidetik. |
| Jika ada masalah kualitas data, Anda dapat mengakses URL ini untuk melihat baris kesalahan spesifik. |
Membatalkan tugas impor
Anda tidak dapat membatalkan tugas Stream Load secara manual setelah dibuat. Sistem hanya akan membatalkan tugas secara otomatis jika terjadi waktu tunggu habis atau dilaporkan kesalahan impor. Anda dapat menggunakan errorUrl dari nilai kembali untuk mengunduh pesan kesalahan dan memecahkan masalah tersebut.
Menampilkan tugas Stream Load
Jika Anda telah mengaktifkan pencatatan operasi Stream Load, Anda dapat menghubungkan ke instans ApsaraDB for SelectDB menggunakan klien MySQL dan menjalankan pernyataan show stream load untuk melihat tugas Stream Load yang telah selesai.
Contoh impor data lengkap
Persiapan
Sebelum memulai operasi impor, selesaikan persiapan.
Impor data CSV
Contoh: Impor menggunakan skrip
Buat tabel tujuan untuk data.
Hubungkan ke instans SelectDB. Untuk informasi selengkapnya, lihat Hubungkan ke instans ApsaraDB for SelectDB menggunakan DMS.
Jalankan pernyataan berikut untuk membuat database.
CREATE DATABASE test_db;Jalankan pernyataan berikut untuk membuat tabel.
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
Di perangkat tempat terminal Stream Load berada, buat file bernama
test.csvyang akan diimpor.1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.comImpor data.
Buka terminal di perangkat target dan jalankan perintah curl untuk memulai tugas Stream Load dan mengimpor data.
Untuk sintaks dan deskripsi parameter pembuatan tugas impor, lihat Buat tugas impor. Contoh berikut menunjukkan skenario impor umum.
Gunakan
Labeluntuk menghapus duplikat dan menentukan waktu tunggu.Impor data dari file
test.csvke tabeltest_tabledi databasetest_db. Gunakan Label untuk menghindari impor batch data duplikat, dan atur waktu tunggu menjadi 100 detik.curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadGunakan
Labeluntuk menghapus duplikat dan gunakan kolom untuk menyaring data dari file.Impor data dari file
test.csvke tabeltest_tabledi databasetest_db. Gunakan Label untuk menghindari impor batch data duplikat, tentukan nama kolom dari file, dan impor hanya data di mana kolom 'address' adalah 'hangzhou'.curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadIzinkan toleransi kesalahan 20%.
Impor data dari file
test.csvke tabeltest_tabledi databasetest_db, memungkinkan tingkat kesalahan 20%.curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadGunakan mode ketat dan atur zona waktu.
Saring data yang diimpor menggunakan mode ketat dan atur zona waktu ke
Africa/Abidjan.curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadHapus data di SelectDB.
Hapus data di SelectDB yang identik dengan data dalam file test.csv.
curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadHapus data yang tidak diinginkan dari file berdasarkan kondisi dan impor data yang tersisa ke SelectDB.
Hapus baris dari file test.csv di mana kolom address adalah 'hangzhou', dan impor baris yang tersisa ke SelectDB.
curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
Contoh: Impor menggunakan kode Java
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. Konfigurasi parameter.
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. Bangun httpclient. Perhatikan bahwa pengalihan (isRedirectable) harus diaktifkan.
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// Aktifkan pengalihan.
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. Bangun objek permintaan httpPut.
HttpPut httpPut = new HttpPut(loadUrl);
// Atur httpHeader...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. Atur data yang akan dikirim. Di sini, kita menulis CSV.
// Asumsikan ada tabel dengan field berikut:
// field1,field2,field3,field4
// Ini mensimulasikan tiga catatan CSV. Di Doris, pemisah baris default untuk CSV adalah \n, dan pemisah kolom adalah \t.
// String data =
// "1\t2\t3\t4\n" +
// "11\t22\t33\t44\n" +
// "111\t222\t333\t444";
// Baca semua baris.
List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
// Gabungkan semua baris dengan \n.
String data = String.join("\n", lines);
httpPut.setEntity(new StringEntity(data));
// 5. Kirim permintaan dan proses hasilnya.
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// Pilih komponen serialisasi JSON yang sesuai untuk meng-serialisasi nilai kembali.
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// Dapatkan kode status yang dikembalikan oleh SelectDB...
String dorisStatus = respAsMap.get("Status");
// Jika SelectDB mengembalikan salah satu status berikut, artinya data berhasil ditulis.
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad gagal, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("berhasil....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}Impor data JSON
Buat tabel tujuan untuk data.
Hubungkan ke instans SelectDB. Untuk informasi selengkapnya, lihat Hubungkan ke instans ApsaraDB for SelectDB menggunakan DMS.
Jalankan pernyataan berikut untuk membuat database.
CREATE DATABASE test_db;Jalankan pernyataan berikut untuk membuat tabel.
CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
Impor data.
PentingSaat Anda mengimpor data dalam format JSON, kinerja untuk format non-array jauh lebih tinggi daripada format array.
Impor data dalam format non-array
Di terminal Stream Load, buat file bernama
json.data. File harus berisi beberapa baris, dengan satu catatan JSON per baris. Berikut adalah konten contohnya:{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}Impor data.
Buka terminal dan jalankan perintah curl untuk memulai tugas Stream Load. Tugas ini mengimpor data dari file
json.datake tabeltest_tabledi databasetest_db.curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
Impor data dalam format array
Di terminal Stream Load, buat file data dalam format array JSON bernama
json_array.data.[ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]Impor data.
Buka terminal dan jalankan perintah curl untuk memulai tugas Stream Load. Tugas ini mengimpor data dari file lokal
json_array.datake tabeltest_tabledi databasetest_db.curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
Mode HTTP Stream
Dalam Stream Load, Anda dapat menggunakan fitur Table Value Function (TVF) untuk menentukan parameter impor dengan ekspresi SQL. Fitur Stream Load ini disebut http_stream. Untuk informasi selengkapnya tentang cara menggunakan TVF, lihat TVF.
URL REST API untuk impor http_stream berbeda dari URL untuk impor Stream Load normal.
URL Stream Load normal:
http://host:http_port/api/{db}/{table}/_stream_load.URL menggunakan TVF http_stream:
http://host:http_port/api/_http_stream.
Sintaks
Berikut adalah sintaks untuk mode HTTP Stream Stream Load.
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_streamUntuk deskripsi parameter HTTP Stream, lihat Parameter.
Contoh
Anda dapat menambahkan parameter SQL bernama load_sql ke header HTTP untuk menggantikan parameter seperti column_separator, line_delimiter, where, dan columns. Blok kode berikut menunjukkan contoh parameter SQL load_sql.
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");Contoh lengkap:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_streamFAQ
P1: Apa yang harus saya lakukan jika muncul kesalahan "get table cloud commit lock timeout" selama impor?
Kesalahan ini menunjukkan bahwa penulisan data terlalu sering telah menyebabkan deadlock tabel. Kami sangat merekomendasikan agar Anda mengurangi frekuensi penulisan dan menggunakan pemrosesan batch untuk data Anda. Stream Load tunggal dapat menulis data sebesar beberapa ratus MB hingga 1 GB.
P2: Saat mengimpor file CSV, bagaimana cara menangani data yang berisi pemisah kolom atau baris?
Anda harus menentukan pemisah kolom dan baris baru serta memodifikasi data impor untuk memastikan bahwa data tidak bertentangan dengan pemisah tersebut. Hal ini memungkinkan data diurai dengan benar. Bagian berikut memberikan contoh:
Data berisi pemisah baris
Jika data yang diimpor berisi pemisah baris yang ditentukan, seperti pemisah baris default \n, Anda harus menentukan pemisah baris baru.
Misalnya, asumsikan file data Anda berisi konten berikut:
Zhang San\n,25,Shaanxi
Li Si\n,30,BeijingDalam skenario ini, \n dalam file adalah data, bukan pemisah baris. Namun, pemisah baris default juga \n. Untuk memastikan file diurai dengan benar, Anda harus menggunakan parameter line_delimiter untuk menentukan pemisah baris baru dan secara eksplisit menambahkan pemisah baru tersebut di akhir setiap baris data dalam file. Berikut adalah contohnya:
Atur pemisah baris untuk impor.
Misalnya, untuk mengganti pemisah baris default
\ndengan\r\n, Anda harus mengatur-H "line_delimiter:\r\n"saat mengimpor data.Tambahkan pemisah baris yang ditentukan di akhir setiap baris data. Teks contoh harus dimodifikasi sebagai berikut:
Zhang San\n,25,Shaanxi\r\n Li Si\n,30,Beijing\r\n
Data berisi pemisah kolom
Jika data yang diimpor berisi pemisah kolom yang ditentukan, seperti pemisah kolom default \t, Anda harus menentukan pemisah kolom baru.
Misalnya, asumsikan file data Anda berisi konten berikut:
Zhang San\t 25 Shaanxi
Li Si\t 30 BeijingDalam skenario ini, \t dalam file adalah data, bukan pemisah kolom. Namun, pemisah kolom default juga \t (karakter tab). Untuk memastikan file diurai dengan benar, Anda harus menggunakan parameter column_separator untuk menentukan pemisah kolom baru dan secara eksplisit menambahkan pemisah baru tersebut di antara kolom dalam file. Berikut adalah contohnya:
Atur pemisah kolom untuk impor.
Misalnya, untuk mengganti pemisah kolom default
\tdengan koma(,), Anda harus mengatur-H "column_separator:,"saat mengimpor data.Tambahkan pemisah kolom yang ditentukan di antara kolom data. Teks contoh harus dimodifikasi sebagai berikut:
Zhang San\t,25,Shaanxi Li Si\t,30,Beijing