All Products
Search
Document Center

E-MapReduce:Stream Load

Last Updated:May 29, 2023

Stream Load is a synchronous import method. This method allows you to import local files or data streams to Doris by sending HTTP requests. Stream Load synchronously imports data and returns the import results. You can determine whether the import is successful based on the response body. This topic describes the basic principles, basic operations, system configurations, and best practices of Stream Load.

Scenarios

Stream Load is mainly suitable for importing local files or importing data from data streams by following procedures.

Note

Some information in this topic is from Apache Doris. For more information, see Stream load.

Basic principles

The following information shows the flowchart of Stream Load. Some import details are not provided.

^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

In Stream Load mode, Doris selects a backend (BE) as the coordinator BE. The coordinator BE is responsible for receiving data and distributing the data to other BEs. You can submit an import job by sending an HTTP request. If you submit the request to the frontend (FE), the FE forwards the request to the coordinator BE by performing an HTTP redirect. You can also directly submit the request to the coordinator BE. Then, the coordinator BE returns the results of the import job to you.

Supported data formats

Stream Load supports two data formats: CSV (text) and JSON.

Basic operations

Create an import job

Stream Load submits and transfers data by using the HTTP protocol. In this example, the curl command is run to submit an import job. You can also use other HTTP clients to perform operations.

  • curl command

    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    
    # The parameters that you can specify in the header are described in the following table. 
    # The format is -H "key1:value1".

    You can execute the HELP STREAM LOAD statement to view the detailed syntax for creating an import job. In Stream Load mode, all parameters related to an import job are specified in the header. The following table describes the parameters.

    Parameter

    Description

    Signature parameters

    user:passwd

    Stream Load uses the HTTP protocol to create an import job. A signature is generated for the import job by using basic access authentication. Doris verifies your identity and import permissions based on the signature.

    Import job parameters

    label

    The identifier of the import job.

    Each import job has a unique label within a single database. You can customize a label for your import job in the import command. After the import job is submitted, you can view the status of the import job based on the label. A unique label can also prevent you from repeatedly importing the same data. We recommend that you use the same label for the same batch of data. This way, repeated requests for the same batch of data are accepted only once to ensure the at-most-once semantics. If the state of the import job that corresponds to a label is CANCELLED, the label can be used again.

    column_separator

    The column delimiter of the file to be imported. Default value: \t.

    If a non-printable character is used as a column delimiter, the delimiter must be in the hexadecimal format and start with the \x prefix. For example, if the column delimiter of a Hive file is \x01, specify this parameter in the following format: -H "column_separator:\x01". You can use a combination of multiple characters as a column delimiter.

    line_delimiter

    The row delimiter of the file to be imported. Default value: \n. You can use a combination of multiple characters as a row delimiter.

    max_filter_ratio

    The maximum tolerance rate of the import job. Valid values: 0 to 1. Default value: 0.

    If the error rate of the import job exceeds the maximum tolerance rate, the import job fails. If you want to ignore erroneous data rows, set this parameter to a value greater than 0 to ensure that the import job can succeed. You can calculate the maximum tolerance rate by using the following formula: [dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)] > max_filter_ratio. dpp.abnorm.ALL represents the number of data rows that cannot be imported due to various reasons such as type mismatch, column mismatch, and length mismatch. dpp.norm.ALL represents the number of data rows that can be imported. You can execute the SHOW LOAD statement to query the number of data rows imported by the import job.

    Number of data rows in the source file = dpp.abnorm.ALL + dpp.norm.ALL

    where

    The filter conditions specified by the import job.

    Stream Load allows you to specify a WHERE clause to filter the source data. The filtered data is not imported or used to calculate the error rate of the import job, but is counted as the value of the NumberUnselectedRows parameter in the import results.

    Partitions

    The partition information of the table to be imported. If the data to be imported does not belong to the specified partitions, the data is not imported. The number of data rows that are not imported is included in dpp.abnorm.ALL.

    columns

    The function transformation configuration of the data to be imported. Stream Load supports column order change and expression transformation. The expression transformation method is the same as that you can use in a query statement.

    exec_mem_limit

    The limit on the memory allocated to the import job. Unit: bytes. By default, the maximum memory allocated to an import job is 2 GB.

    strict_mode

    Specifies whether to enable the strict mode for the import job. By default, this mode is disabled.

    Stream Load allows you to enable the strict mode by setting the strict_mode parameter to true in the header. In strict mode, the data after column type conversion is strictly filtered in the import process based on the following rules:

    • If the strict mode is enabled, erroneous data is filtered out after column type conversion. Erroneous data refers to the values that are not null in the source file but are converted to null values after column type conversion.

    • The strict mode does not apply to columns whose null values are calculated by functions.

    • For an imported column type that contains a range limit, the strict mode does not apply to the columns of this type if the source data can be converted to non-null values after column type conversion but the converted values are out of the limited range. For example, an imported column type is DECIMAL(1,0) and the source value of a column is 10. This source value can be converted based on the column type but the converted value is out of the range specified by the column type. The strict mode does not apply to this column.

    merge_type

    The type of data merging. Default value: APPEND. Valid values:

    • APPEND: appends this batch of data to the existing data.

    • DELETE: deletes all data rows with the same key as this batch of data.

    • MERGE: needs to be used together with the DELETE condition. The data that meets the DELETE condition is processed based on the DELETE semantics, and the data that does not meet the DELETE condition is processed based on the APPEND semantics.

    two_phase_commit

    Specifies whether to enable the two-stage transaction commit mode for the import job. If this mode is enabled, the import results are returned to you after data is written. In this stage, the data is invisible and the transaction is in the PRECOMMITTED state. The data is visible only after you manually trigger a commit operation. By default, the two-stage transaction commit mode is disabled.

    To enable the two-stage transaction commit mode, set the disable_stream_load_2pc parameter to false in the be.conf file and set the two_phase_commit parameter to true in the header.

    Example:

    1. Initiate a Stream Load pre-commit operation.

      Note

      Example of column order change: The source table has three columns: src_c1, src_c2, and src_c3. The Doris table also has three columns: dst_c1, dst_c2, and dst_c3.

      • If the src_c1 column of the source table corresponds to the dst_c1 column of the Doris table, the src_c2 column of the source table corresponds to the dst_c2 column of the Doris table, and the src_c3 column of the source table corresponds to the dst_c3 column of the Doris table, you can specify the columns parameter in the following format: columns: dst_c1, dst_c2, dst_c3.

      • If the src_c1 column of the source table corresponds to the dst_c2 column of the Doris table, the src_c2 column of the source table corresponds to the dst_c3 column of the Doris table, and the src_c3 column of the source table corresponds to the dst_c1 column of the Doris table, you can specify the columns parameter in the following format: columns: dst_c2, dst_c3, dst_c1.

      • Example of expression transformation: The source table has two columns: tmp_c1 and tmp_c2, and the Doris table also has two columns: c1 and c2. However, the two columns of the source table need to be transformed by functions to correspond to the two columns of the Doris table. In this case, you can specify the columns parameter in the following format: columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)g. tmp_* is a placeholder and represents the two columns in the source file.

      curl  --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
      {
          "TxnId": 18036,
          "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****",
          "TwoPhaseCommit": "true",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 100,
          "NumberLoadedRows": 100,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 1031,
          "LoadTimeMs": 77,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 1,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 58,
          "CommitAndPublishTimeMs": 0
      }
    2. Trigger a commit operation on the transaction.

    3. Trigger an abort operation on the transaction.

  • Sample command

    curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
  • Returned results

    Stream Load is a synchronous import method. Therefore, the results of the import job are directly returned to you as a response to your HTTP request. Example:

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 2,
        "ReadDataTimeMs": 325,
        "WriteDataTimeMs": 1933,
        "CommitAndPublishTimeMs": 106,
        "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****"
    }

    The following table describes the parameters in the results of an import job in Stream Load mode.

    Parameter

    Description

    TxnId

    The transaction ID of the import job. The transaction ID can be fully managed by Alibaba Cloud.

    Label

    The label of the import job. You can specify the label, or the label can be automatically generated by the system.

    Status

    The state of the import job. Valid values:

    • Success: The import job is successful.

    • Publish Timeout: The import job is complete, but the data may be delayed. You do not need to try again.

    • Label Already Exists: The label already exists. You must change the label.

    • Fail: The import job fails.

    ExistingJobStatus

    The state of the import job that corresponds to the existing label. This parameter is displayed only if the value of the Status parameter is Label Already Exists. You can know the status of the import job that corresponds to the existing label based on the returned value. Valid values:

    • RUNNING: The import job is in progress.

    • FINISHED: The import job is successful.

    Message

    The error message returned for the import job.

    NumberTotalRows

    The total number of data rows processed by the import job.

    NumberLoadedRows

    The number of data rows that are successfully imported.

    NumberFilteredRows

    The number of data rows that cannot be imported.

    NumberUnselectedRows

    The number of data rows that are filtered out by the WHERE condition.

    LoadBytes

    The number of bytes imported by the import job.

    LoadTimeMs

    The duration of the import job. Unit: milliseconds.

    BeginTxnTimeMs

    The amount of time that is consumed to request the FE to start a transaction. Unit: milliseconds.

    StreamLoadPutTimeMs

    The amount of time that is consumed to request the FE to return an execution plan for the import job. Unit: milliseconds.

    ReadDataTimeMs

    The amount of time that is consumed to read data. Unit: milliseconds.

    WriteDataTimeMs

    The amount of time that is consumed to write data. Unit: milliseconds.

    CommitAndPublishTimeMs

    The amount of time that is consumed to request the FE to commit and publish the transaction. Unit: milliseconds.

    ErrorURL

    The URL that is used to view erroneous data rows.

    Important

    Stream Load is a synchronous import method. Therefore, import information is not recorded in Doris. You cannot view the results of an import job in Stream Load mode asynchronously by running the show load command. To view the import results, you must monitor the response to the HTTP request that you sent to submit an import job.

Cancel an import job

You cannot manually cancel import jobs in Stream Load mode. Import jobs in Stream Load mode are automatically canceled by the system if a timeout error or an import error occurs.

View an import job in Stream Load mode

You can execute the SHOW STREAM LOAD statement to query information about a complete import job in Stream Load mode.

By default, BEs do not record information about import jobs in Stream Load mode. If you want to enable BEs to record such information, set the enable_stream_load_record parameter of BE configurations to true. For more information, see Configuration items of backend nodes.

Relevant system configurations

FE configurations

stream_load_default_timeout_second: the timeout period of an import job in Stream Load mode. Unit: seconds. If an import job is not complete within the specified timeout period, the system cancels the import job and the state of the import job changes to CANCELLED. The default timeout period is 600 seconds. If you estimate that the source file cannot be imported within the specified timeout period, you can set a separate timeout period in the HTTP request when you submit an import job in Stream Load mode or modify the stream_load_default_timeout_second parameter of FE configurations to specify the global default timeout period.

BE configurations

streaming_load_max_mbStream: the maximum amount of data that can be imported by an import job in Stream Load mode. Default value: 10240. Unit: MB. If the size of your source file exceeds this threshold, you must modify the streaming_load_max_mb parameter of BE configurations.

Best practices

Scenarios

Stream Load is most suitable for scenarios in which the source file is stored in memory or on a disk. Stream Load is a synchronous import method. You can also use this import method if you want to obtain the import results in a synchronous manner.

Data amount

In Stream Load mode, BEs are responsible for distributing and importing data. Therefore, we recommend that you import 1 GB to 10 GB of data at a time. By default, the maximum amount of data that can be imported by an import job in Stream Load mode is 10 GB. If you want to import a file whose size exceeds 10 GB, you must modify the streaming_load_max_mb parameter of BE configurations.

For example, if the size of the file to be imported is 15 GB, set the streaming_load_max_mb parameter of BE configurations to 16000.

The default timeout period of an import job in Stream Load mode is 300 seconds. Based on the maximum import speed of Doris, you must modify the default timeout period if you want to import a file whose size exceeds 3 GB.

Timeout period of an import job = Amount of data to be imported/10 MB/s. The actual average import speed of a cluster needs to be calculated based on the actual situation.

For example, if you want to import a file whose size is 10 GB and the import speed of your cluster is 10 MB/s, the timeout period of the import job is 1,000 seconds.

Complete example

Data information: You want to import about 15 GB of data from the /home/store-sales directory of your local disk to the store-sales table in the bj-sales database.

Cluster information: The number of import jobs in Stream Load mode that can be concurrently processed is not affected by the cluster size.

Procedure

  1. Modify the be.conf file of BE configurations because the size of the file to be imported exceeds 10 GB.

    streaming_load_max_mb = 16000
  2. Determine whether the import duration exceeds the default timeout period. In this case, the import duration is 1,500 seconds, which is calculated based on the following formula: 15,000 MB/10 MB/s = 1,500s. If the import duration exceeds the default timeout period, set the stream_load_default_timeout_second parameter to 1500 in the fe.conf file of FE configurations.

  3. Run the following command to create an import job:

    curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load