You can use Stream Load to import local files or data streams into StarRocks. This topic describes the process of importing data using Stream Load.
Background
Stream Load is a synchronous import method that lets you import local files or data streams into StarRocks by sending an HTTP request. The HTTP response returns the import result, which indicates whether the task was successful. Stream Load supports the CSV and JSON file formats. The data size for a single import is limited to 10 GB.
Create an import task
Stream Load submits and transfers data using the HTTP protocol. This topic uses the curl command to demonstrate how to submit an import task. You can also use other HTTP clients.
Syntax
curl --location-trusted -u <username>:<password> -XPUT <url>
(
data_desc
)
[opt_properties] We recommend that you specify
100-continuein theExpectHTTP request header, for example,"Expect:100-continue". This practice prevents unnecessary data transmission and reduces resource overhead if the server rejects the import task.In StarRocks, some words are reserved keywords in SQL and cannot be used directly in SQL statements. To use a reserved keyword in an SQL statement, you must enclose it in backticks (`). For more information about reserved keywords, see Keywords.
Parameters
<username>:<password>: The username and password for the StarRocks cluster. This parameter is required. If the account has no password, specify only the username followed by a colon, such as<username>:.XPUT: The HTTP request method. This parameter is required. For Stream Load, specify the PUT method.<url>: The URL for the Stream Load request. This parameter is required. The format ishttp://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load.The following table describes the parameters in the URL.
Parameter
Required
Description
<fe_host>Yes
The IP address of the frontend (FE) node in the StarRocks cluster.
<fe_http_port>Yes
The HTTP port of the FE node in the StarRocks cluster. The default value is 18030.
You can search for the http_port parameter on the Configuration tab of the StarRocks Cluster Service page to view the port number. Additionally, you can run the
SHOW FRONTENDScommand to view the FE node's IP address and HTTP port number.<database_name>Yes
The name of the database where the destination StarRocks table is located.
<table_name>Yes
The name of the destination StarRocks table.
desc: Specifies the properties of the source data file, including the file name, format, column delimiter, row delimiter, destination partitions, and the column mapping to the StarRocks table. The format is as follows.-T <file_path> -H "format: CSV | JSON" -H "column_separator: <column_separator>" -H "row_delimiter: <row_delimiter>" -H "columns: <column1_name>[, <column2_name>, ... ]" -H "partitions: <partition1_name>[, <partition2_name>, ...]" -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]" -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]" -H "strip_outer_array: true | false" -H "json_root: <json_path>" -H "ignore_json_size: true | false" -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"The parameters in
data_descfall into three categories: common parameters, parameters for CSV, and parameters for JSON.Common parameters
Parameter
Required
Description
<file_path>Yes
The path where the source data file is stored. The file name can optionally include an extension.
formatNo
The format of the data to import. Valid values are
CSVandJSON. The default value isCSV.partitionsNo
The specific partitions to which you want to import data. If you do not specify this parameter, data is imported into all partitions of the StarRocks table by default.
temporary_partitionsNo
The temporary partitions to which you want to import data.
columnsNo
The mapping between the columns in the source data file and the columns in the StarRocks table.
If the columns in the source data file correspond to the columns in the StarRocks table in order, you do not need to specify this parameter.
If the source data file does not match the table schema, you must specify this parameter to configure data transformation rules. Columns can be in two forms. One form directly corresponds to a field in the imported file and can be represented by the field name. The other form needs to be derived through calculation.
Example 1: The table has three columns:
c1, c2, c3. The three columns in the source file correspond toc3,c2,c1in order. You must specify-H "columns: c3, c2, c1".Example 2: The table has three columns:
c1, c2, c3. The first three columns in the source file correspond to the table columns, but there is an extra column. You must specify-H "columns: c1, c2, c3, temp". The last column can have any name as a placeholder.Example 3: The table has three columns:
year, month, day. The source file has only one time column in the 2018-06-01 01:02:03 format. You can specify-H "columns: col, year = year(col), month=month(col), day=day(col)"to complete the import.
Parameters for CSV
Parameter
Required
Description
column_separatorNo
The column delimiter in the source data file. The default value is
\t.For invisible characters, you must add the
\xprefix and use hexadecimal to represent the delimiter. For example, for the Hive file delimiter\x01, specify-H "column_separator:\x01".row_delimiterNo
The row delimiter in the source data file. The default value is
\n.ImportantThe curl command cannot pass \n. When you manually specify the line feed as \n, the shell passes a backslash (\) and then n, instead of passing the line feed character \n directly.
Bash supports another escaped string syntax. To pass
\nand\t, start the string with a dollar sign and a single quote ($') and end it with a single quote ('). For example,-H $'row_delimiter:\n'.skip_headerNo
The number of header rows to skip at the beginning of the CSV file. The value must be an integer. The default value is 0.
In some CSV files, the first few rows are used to define metadata such as column names and types. By setting this parameter, you can make StarRocks ignore the first few rows of the CSV file during import. For example, if you set this parameter to 1, StarRocks ignores the first row of the CSV file during import.
The row delimiter used here must be the same as the one you set in the import command.
whereNo
Used to extract a portion of the data. To filter out unwanted data, you can set this option.
For example, to import only the data where the k1 column is equal to 20180601, specify
-H "where: k1 = 20180601".max_filter_ratioNo
The maximum ratio of rows that can be filtered out due to issues such as data quality. The default value is 0, which means zero tolerance.
NoteData filtered by the WHERE clause is not included.
partitionsNo
The partitions involved in this import.
If you can determine the partitions for the data, we recommend that you specify this parameter. Data that does not meet the specified partitions is filtered out. For example, to import data into partitions p1 and p2, specify
-H "partitions: p1, p2".timeoutNo
The timeout period for the import. The default value is 600 seconds.
The value ranges from 1 to 259200. The unit is seconds.
strict_modeNo
Specifies whether to enable strict mode for this import. Valid values:
false (default): Strict mode is disabled.
true: Strict mode is enabled. When enabled, strict filtering is applied to column type conversions during the import process.
timezoneNo
The time zone for this import. The default is UTC+8.
This parameter affects the results of all time zone-related functions involved in the import.
exec_mem_limitNo
The memory limit for the import. The default value is 2 GB.
Parameters for JSON
Parameter
Required
Description
jsonpathsNo
The names of the fields to import. This parameter is required only when you import JSON-formatted data in matching mode. The parameter value must be in JSON format.
strip_outer_arrayNo
Specifies whether to trim the outermost array structure. Valid values:
false (default): Retains the original structure of the JSON data without stripping the outer array. The entire JSON array is imported as a single value.
For example, for the sample data
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], ifstrip_outer_arrayis set to false, the data is parsed as a single array and imported into the table.true: When the imported data is a JSON array, you must set
strip_outer_arrayto true.For example, for the sample data
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}], ifstrip_outer_arrayis set to true, the data is parsed as two records and imported into the table.
json_rootNo
The root element of the JSON data to import. This parameter is required only when you import JSON-formatted data in matching mode. The parameter value must be a valid JsonPath string. The default value is empty, which means the entire JSON data file is imported.
ignore_json_sizeNo
Specifies whether to check the size of the JSON Body in the HTTP request.
NoteBy default, the size of the JSON body in an HTTP request cannot exceed 100 MB. If the size of the JSON body exceeds 100 MB, the
"The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead enormous memory consuming." error is reported. To avoid this error, you can add the"ignore_json_size:true"setting to the HTTP request header to skip the check for the JSON body size.compression, Content-EncodingNo
The compression algorithm to use during data transmission for Stream Load. Supported algorithms include GZIP, BZIP2, LZ4_FRAME, and Zstandard.
For example,
curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4.
opt_properties: Optional parameters for the import. The specified settings apply to the entire import task.The format is as follows.
-H "label: <label_name>" -H "where: <condition1>[, <condition2>, ...]" -H "max_filter_ratio: <num>" -H "timeout: <num>" -H "strict_mode: true | false" -H "timezone: <string>" -H "load_mem_limit: <num>" -H "partial_update: true | false" -H "partial_update_mode: row | column" -H "merge_condition: <column_name>"The following table describes the parameters.
Parameter
Required
Description
labelNo
The label for the import task. Data with the same label cannot be imported multiple times.
You can specify a label to prevent duplicate data imports. StarRocks retains the labels of successfully completed tasks for the last 30 minutes.
whereNo
The filter conditions. If you specify this parameter, StarRocks filters the transformed data based on the specified conditions. Only data that meets the filter conditions defined in the
whereclause is imported.For example, to import only the data where the k1 column is equal to 20180601, specify
-H "where: k1 = 20180601".max_filter_ratioNo
The maximum ratio of rows that can be filtered out due to issues such as data quality. The default value is 0, which means zero tolerance.
NoteData filtered by the
whereclause is not included.log_rejected_record_numNo
The maximum number of rows that can be filtered out due to poor data quality. This parameter is supported from version 3.1. Valid values:
0,-1, or a positive integer. The default value is0.0: Does not record filtered rows.-1: Records all filtered rows.A positive integer (such as
n) records a maximum ofnfiltered rows on each BE (or CN) node.
timeoutNo
The timeout period for the import. The default value is 600 seconds.
The value ranges from 1 to 259200. The unit is seconds.
strict_modeNo
Specifies whether to enable strict mode for this import.
false (default): Strict mode is disabled.
true: Strict mode is enabled.
timezoneNo
The time zone for this import. The default is UTC+8.
This parameter affects the results of all time zone-related functions involved in the import.
load_mem_limitNo
The memory limit for the import task. The default value is 2 GB.
partial_updateNo
Specifies whether to use partial column updates. Valid values are
TRUEandFALSE. The default value isFALSE.partial_update_modeNo
The mode for partial updates. Valid values are
rowandcolumn.row(default): Specifies row mode for partial updates. This mode is suitable for real-time updates of many columns in small batches.column: Specifies column mode for partial updates. This mode is suitable for batch updates of a few columns across many rows. In this scenario, enabling column mode provides faster updates.For example, in a table with 100 columns, if you update 10 columns (10% of the total) for all rows, enabling column mode can improve update performance by 10 times.
merge_conditionNo
The name of the column used as the condition for the update to take effect. The update takes effect only when the value of this column in the imported data is greater than or equal to the current value.
NoteThe specified column must be a non-primary key column. Conditional updates are supported only for primary key tables.
Example
This example shows how to import the data.csv file into the `example_table` table of the `load_test` database in a StarRocks cluster. For a complete example, see Complete example of data import.
curl --location-trusted -u "root:" \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator: ," \
-T data.csv -XPUT \
http://172.17.**.**:18030/api/load_test/example_table/_stream_loadReturn value
After the import is complete, the result of the import task is returned in JSON format. The following is a sample response.
{
"TxnId": 9,
"Label": "label2",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 4,
"NumberLoadedRows": 4,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 45,
"LoadTimeMs": 235,
"BeginTxnTimeMs": 101,
"StreamLoadPlanTimeMs": 102,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 11,
"CommitAndPublishTimeMs": 19
}The following table describes the parameters in the response.
Parameter | Description |
| The transaction ID of the import. |
| The label of the import. |
| The import status. Valid values:
|
| The status of the import task corresponding to the existing label. This field is displayed only when the
|
| Details about the import task status. If the import fails, the specific reason for the failure is returned. |
| The total number of rows read from the data stream. |
| The number of rows imported. This is valid only when the import status is Success. |
| The number of rows filtered out, which are rows with unqualified data quality. |
| The number of rows filtered out by the Where condition. |
| The size of the source file for the import task. |
| The time taken for the import task, in milliseconds. |
| The time taken to start the transaction for the import task. |
| The time taken to generate the execution plan for the import task. |
| The time taken to read data for the import task. |
| The time taken to write data for the import task. |
| This parameter is returned if the task fails to import data. You can use You can run the For example, to export the error row information: The exported error row information is saved to a local file named You can adjust the import task based on the error message and then resubmit the task. |
Cancel an import task
You cannot manually cancel a Stream Load task. A task is automatically canceled if it times out or an error occurs. You can use the ErrorURL from the return value to download the error message for troubleshooting.
Complete example of data import
This example uses the curl command to create an import task.
Create a table for the data to be imported.
Log on to the master node of the StarRocks cluster using Secure Shell (SSH). For more information, see Log on to a cluster.
Run the following command to connect to the StarRocks cluster using a MySQL client.
mysql -h127.0.0.1 -P 9030 -urootRun the following commands to create a database and a table.
CREATE DATABASE IF NOT EXISTS load_test; USE load_test; CREATE TABLE IF NOT EXISTS example_table ( id INT, name VARCHAR(50), age INT ) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( "replication_num" = "1" -- Set the number of replicas to 1. );After you run the commands, press
Ctrl+Dto exit the MySQL client.
Prepare the test data.
Prepare CSV data
For example, create a file named
data.csvwith the following content.id,name,age 1,Alice,25 2,Bob,30 3,Charlie,35Prepare JSON data
For example, create a file named
json.datawith the following content.{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}Run the following command to create an import task.
Import CSV data
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label1" \ -H "column_separator: ," \ -T data.csv -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_loadImport JSON data
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label2" \ -H "format:json" \ -T json.data -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_load
Code integration examples
For a Java development example, see stream_load.
For an example of how to integrate Stream Load with Spark, see 01_sparkStreaming2StarRocks.