You can use Stream Load to import local files or data streams into an ApsaraDB for SelectDB instance. This topic describes how to use Stream Load to import data into ApsaraDB for SelectDB.
Background information
Stream Load is a synchronous data import method. You can send HTTP requests to import local files or data streams into ApsaraDB for SelectDB. Stream Load returns the import result immediately. You can check the request's return value to determine whether the import was successful. Stream Load supports the CSV (text), JSON, PARQUET, and ORC data formats.
Stream Load offers high throughput, low latency, and is flexible and reliable. We strongly recommend that you use Stream Load as your primary data import method.
Preparations
Ensure that the terminal used to send Stream Load requests can connect to the SelectDB instance over the network:
Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for and release a public endpoint.
If the terminal that sends Stream Load requests is in the same VPC as the ApsaraDB for SelectDB instance, you can skip this step.
Add the IP addresses of the terminal that sends Stream Load requests to the whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure a whitelist.
If the terminal that sends Stream Load requests has a whitelist configured, add the IP address range of the SelectDB instance to that whitelist.
To obtain the IP address of the SelectDB instance in the VPC to which the SelectDB instance belongs, you can perform the operations provided in How do I view the IP addresses in the VPC to which my ApsaraDB SelectDB instance belongs?
To obtain the public IP address of the SelectDB instance, you can run the ping command to access the public endpoint of the SelectDB instance and obtain the IP address of the instance.
Optional: Modify the compute cluster (backend) configuration to enable Stream Load operation records.
By default, the compute cluster does not record Stream Load operations.
To track Stream Load operations, set enable_stream_load_record to true and restart the cluster before you create an import task. To enable this feature, you must submit a ticket for technical support.
Optional: Modify the compute cluster configuration to adjust the maximum import size for Stream Load.
The default maximum size of a file that can be imported using Stream Load is 10,240 MB.
If your source file exceeds this size, you can adjust the backend parameter streaming_load_max_mb. For more information about how to modify parameters, see Configure parameters.
Optional: Modify the frontend (FE) configuration to adjust the import timeout.
The default timeout for a Stream Load task is 600 seconds. If an import task does not finish within the specified timeout, the system cancels the task and changes its status to CANCELLED.
If the source file cannot be imported within the time limit, you can set a specific timeout in the Stream Load request or adjust the FE parameter stream_load_default_timeout_second and restart the instance to set a global default timeout. To make this adjustment, you must submit a ticket for technical support.
Usage notes
A single Stream Load can write several hundred MB to 1 GB of data. In some business scenarios, frequently writing small amounts of data can significantly degrade instance performance and even cause table deadlocks. We strongly recommend that you reduce the write frequency and use batch processing for your data.
Application-side batching: Collect business data on the application side and then send a Stream Load request to SelectDB.
Server-side batching: After SelectDB receives a Stream Load request, the server performs batch processing on the data. For more information, see Group Commit.
Create an import task
Stream Load submits and transfers data over the HTTP protocol. The following example shows how to submit an import task using the curl command. You can run this command in a Linux or macOS terminal, or in a Windows command prompt. You can also use other HTTP clients for Stream Load operations.
Syntax
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_loadParameters
Parameter | Required | Description |
| Yes | If authentication is required, this passes the |
| Yes | Specify the username and password for the SelectDB instance.
|
| No | Specify the content of the request header (Header) for this Stream Load import request. The format is as follows:
Common parameters are as follows:
For more information about request header parameters, see Request header parameters. |
| Yes | Specify the path of the data file to import. file_path: The path of the object file. |
| Yes | The method of the HTTP request. Use the PUT request method to specify the data import address of SelectDB. The specific parameters are as follows:
|
Request header parameters
Stream Load uses the HTTP protocol. Therefore, parameters related to the import task are set in the request header. The following table describes common import parameters.
Parameter | Description |
| The unique ID of the import task.
Important We recommend using the same |
| Specifies the import data format.
For more information about file format requirements and related parameters, see File Formats. |
| Specifies the row delimiter in the import file. You can also use a combination of multiple characters as a row delimiter. For example, in Windows systems, use \r\n as the row delimiter.
|
| Specifies the column delimiter in the import file. You can also use a combination of multiple characters as a column delimiter. For example, you can use a double vertical line For invisible characters, you need to add
|
| Specifies the compression format of the file. Compression is supported only for Supported compression formats: |
| Specifies the maximum error tolerance for the import task. When the import error rate exceeds this threshold, the import fails. To ignore error rows, you must set this parameter to a value greater than 0 to ensure the import succeeds.
|
| Specifies whether to enable strict mode.
|
| Specifies the cluster to use for the import. By default, the instance's default cluster is used. If the instance does not have a default cluster set, a cluster for which you have permissions is automatically selected. |
| Specifies whether to import data into only one tablet of the corresponding partition. This parameter can only be set when importing data into a Duplicate Key table with random bucketing.
|
| Specifies the filter condition for the import task. You can specify a
|
| Specifies the partition information for the data to be imported. If the data to be imported does not belong to the specified partition, it will not be imported. These data rows are counted in dpp.abnorm.ALL.
|
| Specifies the function transformation configuration for the data to be imported. Supported function transformation methods include column order changes and expression transformations. The method for expression transformation is the same as in a search statement. |
| Specifies the data merge type.
Important The |
| This is only meaningful when the |
| This is only applicable to the Unique Key model. For the same key columns, it ensures that the value columns are replaced according to the source_sequence column. The source_sequence can be a column from the data source or a column in the table schema. |
| Specifies the import memory limit.
|
| Specifies the import timeout.
|
| Specifies the time zone to use for this import. This parameter affects the results of all time zone-related functions involved in the import. For more information about time zones, see the IANA Time Zone Database. Default value: |
| Specifies whether to enable two-phase commit mode.
|
jsonpaths | There are two ways to import data in JSON format:
|
json_root | The The default value is "", which means the entire JSON object is selected as the root node. |
read_json_by_line | An important parameter in Stream Load for processing JSON data. It controls how to parse an input file that contains multiple lines of JSON data.
|
strip_outer_array | An important parameter in Stream Load for processing JSON data. It controls how to parse JSON data that contains an outer array.
Important When you import data in JSON format, the performance for non-array formats is significantly higher than for array formats. |
Example
This example shows how to import the CSV file data.csv into the test_table table in the test_db database. The instance's VPC endpoint is selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com. This is only a sample curl command. For a complete example, see Complete data import examples.
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadReturn values
Stream Load is a synchronous import method. The import result is returned directly in the response to the creation request. The following code block shows a sample return value.
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}The following table describes the parameters in the return value.
Parameter | Description |
| The transaction ID of the import. |
| The import ID. You can specify a custom ID or have one generated by the system. |
| The import status. Valid values are:
|
| The status of the import job corresponding to the existing This field is displayed only when the Status is You can use this status to determine the state of the import task corresponding to the existing Label.
|
| The error message. |
| The total number of rows processed. |
| The number of rows successfully imported. |
| The number of rows with unqualified data quality. |
| The number of rows filtered by the |
| The number of bytes imported. |
| The time taken for the import. Unit: milliseconds. |
| The time spent requesting the FE to start a transaction. Unit: milliseconds. |
| The time spent requesting the FE to get the import data execution plan. Unit: milliseconds. |
| The time spent reading data. Unit: milliseconds. |
| The time spent performing the write data operation. Unit: milliseconds. |
| The time spent requesting the FE to commit and publish the transaction. Unit: milliseconds. |
| If there are data quality issues, you can access this URL to view the specific error rows. |
Cancel an import task
You cannot manually cancel a Stream Load task after it is created. The system automatically cancels a task only if a timeout occurs or an import error is reported. You can use the errorUrl from the return value to download the error message and troubleshoot the problem.
View a Stream Load task
If you have enabled Stream Load operation records, you can connect to the ApsaraDB for SelectDB instance using a MySQL client and run the show stream load statement to view completed Stream Load tasks.
Complete data import examples
Preparations
Before you start the import operation, complete the preparations.
Import CSV data
Example: Import using a script
Create the destination table for the data.
Connect to the SelectDB instance. For more information, see Connect to an ApsaraDB for SelectDB instance using DMS.
Run the following statement to create a database.
CREATE DATABASE test_db;Run the following statement to create a table.
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
On the device where the Stream Load terminal is located, create a file named
test.csvto be imported.1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.comImport the data.
Open the terminal on the target device and run the curl command to start a Stream Load task and import the data.
For the syntax and parameter descriptions for creating an import task, see Create an import task. The following examples show common import scenarios.
Use a
Labelto remove duplicates and specify a timeout.Import data from the file
test.csvinto the tabletest_tablein the databasetest_db. Use a Label to avoid importing duplicate batches of data, and set the timeout to 100 seconds.curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadUse a
Labelto remove duplicates and use columns to filter data from the file.Import data from the file
test.csvinto the tabletest_tablein the databasetest_db. Use a Label to avoid importing duplicate batches of data, specify the column names from the file, and import only the data where the 'address' column is 'hangzhou'.curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadAllow a 20% error tolerance.
Import data from the file
test.csvinto the tabletest_tablein the databasetest_db, allowing a 20% error rate.curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadUse strict mode and set a time zone.
Filter the imported data using strict mode and set the time zone to
Africa/Abidjan.curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadDelete data in SelectDB.
Delete data in SelectDB that is identical to the data in the test.csv file.
curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadDelete unwanted data from the file based on a condition and import the remaining data into SelectDB.
Delete rows from the test.csv file where the address column is 'hangzhou', and import the remaining rows into SelectDB.
curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
Example: Import using Java code
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. Configure parameters.
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. Build the httpclient. Note that redirection (isRedirectable) must be enabled.
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// Enable redirection.
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. Build the httpPut request object.
HttpPut httpPut = new HttpPut(loadUrl);
// Set httpHeader...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. Set the data to send. Here, we write a CSV.
// Assume there is a table with the following fields:
// field1,field2,field3,field4
// This simulates three CSV records. In Doris, the default row delimiter for CSV is \n, and the column delimiter is \t.
// String data =
// "1\t2\t3\t4\n" +
// "11\t22\t33\t44\n" +
// "111\t222\t333\t444";
// Read all lines.
List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
// Join all lines with \n.
String data = String.join("\n", lines);
httpPut.setEntity(new StringEntity(data));
// 5. Send the request and process the result.
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// Choose a suitable JSON serialization component to serialize the return value.
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// Get the status code returned by SelectDB...
String dorisStatus = respAsMap.get("Status");
// If SelectDB returns any of the following statuses, it means the data was written successfully.
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}Import JSON data
Create the destination table for the data.
Connect to the SelectDB instance. For more information, see Connect to an ApsaraDB for SelectDB instance using DMS.
Run the following statement to create a database.
CREATE DATABASE test_db;Run the following statement to create a table.
CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
Import the data.
ImportantWhen you import data in JSON format, the performance for non-array formats is significantly higher than for array formats.
Import data in non-array format
On the Stream Load terminal, create a file named
json.data. The file must contain multiple lines, with one JSON record per line. The following is the sample content:{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}Import the data.
Open the terminal and run the curl command to start a Stream Load task. This task imports data from the
json.datafile into the tabletest_tablein the databasetest_db.curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
Import data in array format
On the Stream Load terminal, create a data file in JSON array format named
json_array.data.[ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]Import the data.
Open the terminal and run the curl command to start a Stream Load task. This task imports data from the local file
json_array.datainto the tabletest_tablein the databasetest_db.curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
HTTP Stream mode
In Stream Load, you can use the Table Value Function (TVF) feature to specify import parameters with SQL expressions. This Stream Load feature is called http_stream. For more information about how to use TVFs, see TVF.
The REST API URL for an http_stream import is different from the URL for a normal Stream Load import.
Normal Stream Load URL:
http://host:http_port/api/{db}/{table}/_stream_load.URL using TVF http_stream:
http://host:http_port/api/_http_stream.
Syntax
The following is the syntax for the HTTP Stream mode of Stream Load.
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_streamFor a description of HTTP Stream parameters, see Parameters.
Example
You can add an SQL parameter named load_sql to the HTTP header to replace parameters such as column_separator, line_delimiter, where, and columns. The following code block shows an example of the SQL parameter load_sql.
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");Complete example:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_streamFAQ
Q1: What do I do if the "get table cloud commit lock timeout" error is reported during import?
This error indicates that writing data too frequently has caused a table deadlock. We strongly recommend that you reduce the write frequency and use batch processing for your data. A single Stream Load can write several hundred MB to 1 GB of data.
Q2: When importing a CSV file, how do I handle data that contains column or row delimiters?
You must specify new column and row delimiters and modify the import data to ensure that the data does not conflict with the delimiters. This allows the data to be parsed correctly. The following sections provide examples:
Data contains the row delimiter
If the imported data contains the specified row delimiter, such as the default row delimiter \n, you must specify a new row delimiter.
For example, assume that your data file contains the following content:
Zhang San\n,25,Shaanxi
Li Si\n,30,BeijingIn this scenario, the \n in the file is data, not a row delimiter. However, the default row delimiter is also \n. To ensure that the file is parsed correctly, you must use the line_delimiter parameter to specify a new row delimiter and explicitly add the new delimiter to the end of each data line in the file. The following is an example:
Set the row delimiter for the import.
For example, to replace the default row delimiter
\nwith\r\n, you must set-H "line_delimiter:\r\n"when you import the data.Add the specified row delimiter to the end of each data line. The sample text must be modified as follows:
Zhang San\n,25,Shaanxi\r\n Li Si\n,30,Beijing\r\n
Data contains the column delimiter
If the imported data contains the specified column delimiter, such as the default column delimiter \t, you must specify a new column delimiter.
For example, assume that your data file contains the following content:
Zhang San\t 25 Shaanxi
Li Si\t 30 BeijingIn this scenario, the \t in the file is data, not a column delimiter. However, the default column delimiter is also \t (tab character). To ensure that the file is parsed correctly, you must use the column_separator parameter to specify a new column delimiter and explicitly add the new delimiter between columns in the file. The following is an example:
Set the column delimiter for the import.
For example, to replace the default column delimiter
\twith a comma(,), you must set-H "column_separator:,"when you import the data.Add the specified column delimiter between the data columns. The sample text must be modified as follows:
Zhang San\t,25,Shaanxi Li Si\t,30,Beijing