All Products
Search
Document Center

ApsaraDB for SelectDB:Stream Load

Last Updated:Mar 28, 2026

Stream Load is a synchronous HTTP-based method for loading local files or data streams into ApsaraDB for SelectDB. Submit an HTTP PUT request, and the result returns immediately in the response body — no polling required. Supported formats: CSV, JSON, Parquet, and ORC.

Important

Always check the Status field in the response body to determine whether the import succeeded. An HTTP 200 response only means the request was received — the actual import result is in the body.

Prerequisites

Before you begin, ensure that you have:

  • Network connectivity between the machine running Stream Load and your SelectDB instance

  • Login credentials (username and password) for the SelectDB instance

Set up network access:

  1. If your machine is not in the same Virtual Private Cloud (VPC) as the SelectDB instance, apply for a public endpoint.

  2. Add the IP addresses of your machine to the instance whitelist.

  3. If your machine has an outbound whitelist, add the SelectDB instance's IP range to it:

Usage notes

A single Stream Load job can write several hundred MB to 1 GB of data. Loading small amounts of data at high frequency degrades instance performance and can cause table deadlocks. Use batch processing to reduce load frequency:

  • Application-side batching: Collect data in your application before sending a Stream Load request.

  • Server-side batching: Use Group Commit to let SelectDB batch incoming requests server-side.

How Stream Load works

  1. Submit an HTTP PUT request with the data file attached.

  2. SelectDB synchronously processes the data and writes it to the target table.

  3. The response body contains the import result, including a Status field and row-level metrics.

Load data with Stream Load

Stream Load uses HTTP PUT requests. The examples below use curl, but any HTTP client works.

Syntax

curl --location-trusted \
  -u <username>:<password> \
  -H "expect:100-continue" \
  [-H "<header-key>:<header-value>"] \
  -T <file-path> \
  -XPUT http://<host>:<port>/api/<db_name>/<table_name>/_stream_load

Replace the placeholders with your actual values:

PlaceholderDescriptionExample
<username>SelectDB usernameadmin
<password>SelectDB passwordyour-password
<host>VPC endpoint or public endpoint of the instanceselectdb-cn-xxx-fe.selectdbfe.rds.aliyuncs.com
<port>HTTP port. Default: 80808080
<db_name>Target database nametest_db
<table_name>Target table nametest_table
<file-path>Path to the local data file./data.csv

Choosing the right endpoint:

  • Same VPC: Use the VPC endpoint.

  • Different VPC or outside Alibaba Cloud: Use the public endpoint. Both endpoints are listed on the instance details page in the SelectDB console.

Request headers

Set import options as HTTP headers using -H "key:value".

HeaderDefaultDescription
labelSystem-generatedUnique ID for this import job. Use the same label for retries on the same data batch to prevent duplicate imports (At-Most-Once semantics). Labels can be reused once the corresponding job reaches CANCELLED status.
formatCSVData format. Supported values: CSV, JSON, PARQUET, ORC, csv_with_names (skips first row), csv_with_names_and_types (skips first two rows). See File formats for format-specific parameters.
column_separator\tColumn delimiter. Supports multi-character delimiters. For non-printable characters, use hex with \x prefix — for example, Hive's \x01 separator: -H "column_separator:\x01".
line_delimiter\nRow delimiter. On Windows, use \r\n. Supports multi-character delimiters.
compress_typeNoneCompression format. Supported values: gz, lzo, bz2, lz4, lzop, deflate. Only supported for CSV and JSON files.
max_filter_ratio0Maximum fraction of rows that can fail data quality checks. Range: [0, 1]. Default 0 means zero tolerance — any bad row fails the job. Set to a value greater than 0 to allow some rows to be skipped.
strict_modefalseWhen true, applies strict type-checking during column type conversion. Rows where a non-null source value converts to NULL are filtered out.
cloud_clusterInstance defaultSpecifies which compute cluster handles the import. If no default cluster is set, SelectDB picks one automatically based on your permissions.
load_to_single_tabletfalseWhen true, writes all data to a single tablet within the target partition. Only applicable to Duplicate Key tables with random bucketing. Improves throughput for high-concurrency imports.
whereNoneSQL filter condition. Rows that do not match are excluded from import and counted in NumberUnselectedRows, but not in the filter ratio calculation.
partitionsNoneRestricts import to specified partitions. Rows outside these partitions are excluded and counted in dpp.abnorm.ALL (reflected in NumberFilteredRows).
columnsNoneMaps and transforms source columns. Supports column reordering and SQL expression transformations using the same syntax as SELECT expressions.
merge_typeAPPENDData merge behavior. Options: APPEND (default, normal write), MERGE (use with delete to mark the Delete Flag column), DELETE (treat all rows as deletions). MERGE and DELETE are only supported on Unique Key tables.
deleteNoneSQL condition for marking rows as deleted. Only used when merge_type is MERGE.
function_column.sequence_colNoneFor Unique Key tables with sequence columns. Specifies which column (from the source data or table schema) determines row replacement order.
exec_mem_limit2147483648Memory limit for the import job, in bytes. Default: 2 GiB.
timeout600Import timeout in seconds. Range: [1, 259200]. If the job exceeds this limit, it is automatically cancelled.
timezoneAsia/ShanghaiTime zone for time-related functions during import. Uses IANA time zone names.
two_phase_commitfalseWhen true, data is written but stays invisible until you manually commit the transaction. The transaction status is PRECOMMITTED until committed. Useful for atomic, all-or-nothing import behavior.
jsonpathsNoneJSON field extraction pattern, for example ["$.status", "$.res.id"]. Used when the JSON structure does not map directly to table columns. Without this, keys must match column names (order can differ).
json_root"" (entire object)JSONPath expression that selects a child object as the root for parsing.
read_json_by_linefalseWhen true, treats each line as a separate JSON object. When false, the entire file is parsed as one JSON value or array.
strip_outer_arrayfalseWhen true, strips the outer array from a JSON array and imports each element as a separate row. When false, the entire array is imported as a single record. Non-array JSON format has significantly higher performance than array format.

Example

Import data.csv into test_table in test_db:

curl --location-trusted \
  -u admin:admin_123 \
  -T data.csv \
  -H "label:123" \
  -H "expect:100-continue" \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Response

Stream Load returns the result synchronously. Check the Status field — not the HTTP status code — to determine success.

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}
FieldDescription
TxnIdTransaction ID.
LabelImport label. Custom or system-generated.
StatusImport result. Success: completed. Publish Timeout: complete but data may be briefly invisible — do not retry. Label Already Exists: duplicate label, change it. Fail: import failed.
ExistingJobStatusStatus of the existing job for a duplicate label. Only present when Status is Label Already Exists. Values: RUNNING or FINISHED.
MessageError message, if any.
NumberTotalRowsTotal rows processed.
NumberLoadedRowsRows successfully imported.
NumberFilteredRowsRows filtered due to data quality issues.
NumberUnselectedRowsRows excluded by the where condition.
LoadBytesBytes imported.
LoadTimeMsTotal import time, in milliseconds.
BeginTxnTimeMsTime to start the transaction on the frontend (FE), in milliseconds.
StreamLoadPutTimeMsTime to get the execution plan from the FE, in milliseconds.
ReadDataTimeMsTime spent reading the source data, in milliseconds.
WriteDataTimeMsTime spent writing data, in milliseconds.
CommitAndPublishTimeMsTime to commit and publish the transaction, in milliseconds.
ErrorURLIf there are data quality issues, you can access this URL to view the specific error rows.

Inspect import errors

If there are data quality issues, use ErrorURL to download the rejected rows:

curl "<ErrorURL>"
# or save to a file:
wget "<ErrorURL>" -O error_rows.txt

Cancel and view import tasks

Stream Load tasks cannot be manually cancelled after submission. The system automatically cancels a task if it exceeds the timeout or encounters an unrecoverable error.

To view completed Stream Load tasks, first enable Stream Load operation records, then connect to the instance using a MySQL client and run:

SHOW STREAM LOAD;

Import CSV data

Example: Import using a script

Set up the destination table

  1. Connect to the SelectDB instance using DMS.

  2. Create the database and table:

    CREATE DATABASE test_db;
    
    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 16
    PROPERTIES("replication_num" = "1");
  3. On the machine where you will run Stream Load, create a file named test.csv:

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com

Deduplicate with a label and set a custom timeout

Import test.csv with a label to prevent duplicate imports and a custom timeout of 100 seconds:

curl --location-trusted \
  -u admin:admin_123 \
  -H "label:123" \
  -H "timeout:100" \
  -H "expect:100-continue" \
  -H "column_separator:," \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Filter rows by column value

Import only rows where address is hangzhou, using column mapping to specify field order:

curl --location-trusted \
  -u admin:admin_123 \
  -H "label:123" \
  -H "columns: id,name,age,address,url" \
  -H "where: address='hangzhou'" \
  -H "expect:100-continue" \
  -H "column_separator:," \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Allow a 20% error tolerance

Import with up to 20% of rows allowed to fail data quality checks:

curl --location-trusted \
  -u admin:admin_123 \
  -H "label:123" \
  -H "max_filter_ratio:0.2" \
  -H "expect:100-continue" \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Enable strict mode with a custom time zone

Apply strict type-checking and set the time zone to Africa/Abidjan:

curl --location-trusted \
  -u admin:admin_123 \
  -H "strict_mode: true" \
  -H "timezone: Africa/Abidjan" \
  -H "expect:100-continue" \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Delete matching rows in SelectDB

Delete all rows in test_table that match rows in test.csv:

curl --location-trusted \
  -u admin:admin_123 \
  -H "merge_type: DELETE" \
  -H "expect:100-continue" \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Delete rows by condition, import the rest

Delete rows where address is hangzhou and import the remaining rows:

curl --location-trusted \
  -u admin:admin_123 \
  -H "expect:100-continue" \
  -H "columns: id,name,age,address,url" \
  -H "merge_type: MERGE" \
  -H "delete: address='hangzhou'" \
  -H "column_separator:," \
  -T test.csv \
  http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

Example: Import using Java code

The following example shows a complete Java implementation using Apache HttpClient. Check the Status field in the response body to determine success — not the HTTP status code.

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. Configure parameters.
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. Build the HTTP client. Redirection (isRedirectable) must be enabled.
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. Build the PUT request.
        HttpPut httpPut = new HttpPut(loadUrl);

        // Set authentication and content headers.
        String basicAuth = Base64.getEncoder().encodeToString(
            String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. Read the CSV file and attach it as the request body.
        // Default CSV delimiters: row = \n, column = \t
        List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        String data = String.join("\n", lines);
        httpPut.setEntity(new StringEntity(data));

        // 5. Send the request and check the result.
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(
            new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // HTTP 200 does not guarantee import success.
            // Always check the Status field returned by SelectDB.
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            String dorisStatus = respAsMap.get("Status");
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException(
                    "StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("Import successful.");
            }
        } else {
            throw new IOException(
                "StreamLoad HTTP error: " + httpStatus + ", url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

Import JSON data

Important

Non-array JSON format has significantly higher performance than array format. Use line-delimited JSON (read_json_by_line:true) when possible.

Set up the destination table

  1. Connect to the SelectDB instance using DMS.

  2. Create the database and table:

    CREATE DATABASE test_db;
    
    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 16
    PROPERTIES("replication_num" = "1");

Import line-delimited JSON (recommended)

Create a file json.data with one JSON object per line:

{"id":1,"name":"Emily","age":25}
{"id":2,"name":"Benjamin","age":35}
{"id":3,"name":"Olivia","age":28}
{"id":4,"name":"Alexander","age":60}
{"id":5,"name":"Ava","age":17}

Import it with read_json_by_line:true:

curl --location-trusted \
  -u admin:admin_123 \
  -H "Expect:100-continue" \
  -H "format:json" \
  -H "read_json_by_line:true" \
  -T json.data \
  -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

Import a JSON array

Create a file json_array.data in JSON array format:

[
  {"userid":1,"username":"Emily","userage":25},
  {"userid":2,"username":"Benjamin","userage":35},
  {"userid":3,"username":"Olivia","userage":28},
  {"userid":4,"username":"Alexander","userage":60},
  {"userid":5,"username":"Ava","userage":17}
]

Import it with strip_outer_array:true and jsonpaths to map the non-matching field names:

curl --location-trusted \
  -u admin:admin_123 \
  -H "Expect:100-continue" \
  -H "format:json" \
  -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" \
  -H "columns:id,age,name" \
  -H "strip_outer_array:true" \
  -T json_array.data \
  -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

HTTP Stream mode

HTTP Stream mode (http_stream) lets you specify import parameters as a SQL expression in the request header, using the Table Value Function (TVF) feature. The API endpoint differs from standard Stream Load:

  • Standard Stream Load: http://host:http_port/api/{db}/{table}/_stream_load

  • HTTP Stream mode: http://host:http_port/api/_http_stream

Syntax

curl --location-trusted \
  -u <username>:<password> \
  -H "sql: ${load_sql}" \
  -T <file_name> \
  -XPUT http://host:http_port/api/_http_stream

The load_sql header replaces individual headers like column_separator, line_delimiter, where, and columns with a single SQL statement:

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

Example

curl --location-trusted \
  -u admin:admin_123 \
  -T test.csv \
  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" \
  http://host:http_port/api/_http_stream

For more information about TVFs, see TVF.File Formats

Optional configuration

Optional: Enable Stream Load records

By default, the compute cluster does not record Stream Load operations. To enable recording:

  1. Set the backend parameter enable_stream_load_record to true.

  2. Restart the compute cluster.

Enabling this feature requires submitting a support ticket.

Optional: Increase the maximum file size

The default maximum file size for a single Stream Load job is 10,240 MB. To increase it, adjust the backend parameter streaming_load_max_mb. For instructions, see Configure parameters.

Optional: Adjust the default timeout

The default timeout is 600 seconds. Override it per job using the timeout header. To change the global default, set the frontend (FE) parameter stream_load_default_timeout_second and restart the instance.

Changing global parameters requires submitting a support ticket.

FAQ

What causes the "get table cloud commit lock timeout" error?

This error means writes are happening too frequently, causing a table lock. Reduce the write frequency and batch your data. A single Stream Load job should target several hundred MB to 1 GB of data per request. See Usage notes for batching strategies.

How do I handle CSV data that contains column or row delimiters?

Specify new delimiters and update the data file so the delimiter characters in your data do not conflict with the chosen delimiters.

Data contains the row delimiter

If your data contains the default row delimiter \n as a data value (not a row boundary), specify a different row delimiter.

Example — original file:

Zhang San\n,25,Shaanxi
Li Si\n,30,Beijing

Steps:

  1. Set a new row delimiter: add -H "line_delimiter:\r\n" to your request.

  2. Update the file to end each row with the new delimiter:

    Zhang San\n,25,Shaanxi\r\n
    Li Si\n,30,Beijing\r\n

Data contains the column delimiter

If your data contains the default column delimiter \t (tab) as a data value, specify a different column delimiter.

Example — original file:

Zhang San\t  25  Shaanxi
Li Si\t  30  Beijing

Steps:

  1. Set a new column delimiter: add -H "column_separator:," to your request.

  2. Update the file to separate columns with the new delimiter:

    Zhang San\t,25,Shaanxi
    Li Si\t,30,Beijing

What's next