In Broker Load mode, StarRocks uses broker processes to read data from data sources, such as Apache Hadoop Distributed File System (HDFS) and Alibaba Cloud Object Storage Service (OSS), and uses its computing resources to preprocess and import the data. This topic describes how to import data in Broker Load mode.

Background information

Broker Load is an asynchronous import method. You can create an import job based on the MySQL protocol and execute the SHOW LOAD statement to query the import results. StarRocks supports data import from external storage systems in various file formats including CSV, ORC, and Parquet. We recommend that you run each import job to import tens to hundreds of GB of data at a time.

Import data in Broker Load mode

Query brokers

When you create a StarRocks cluster in E-MapReduce (EMR), brokers are automatically built and started on every core node. You can execute the SHOW PROC statement to query the brokers of a cluster. The following code shows the syntax:
SHOW PROC "/brokers"\G
The following output is returned:
*************************** 1. row ***************************
          Name: broker
            IP: 10.0.1.151
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 2. row ***************************
          Name: broker
            IP: 10.0.1.154
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 3. row ***************************
          Name: broker
            IP: 10.0.1.153
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 4. row ***************************
          Name: broker
            IP: 10.0.1.152
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
4 rows in set (0.00 sec)

Create an import job

  • Syntax
    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_name broker_properties
        [PROPERTIES (key1=value1, ... )]
  • Parameters
    You can execute the HELP BROKER LOAD statement to view the syntax for creating an import job.
    • Label

      The identifier of the import job. Each import job has a unique label. You can define a custom label. Otherwise, a label is generated by the system. The label can be used to query the status of the import job and avoid importing duplicate data. After the state of the import job changes to FINISHED, the label becomes invalid. If the state of the import job changes to CANCELLED, you can use the label to submit the import job again.

    • data_desc

      A string that describes the data to be imported. You can specify multiple data_desc strings, each with information such as the data source address, extract, transform, and load (ETL) functions, destination table, and partitions.

      An import job in Broker Load mode allows you to import data to multiple tables at a time. Specify a data_desc string for each destination table. In each data_desc string, specify the data source address, which can contain multiple file_path strings that specify multiple source files. The Broker Lode mode ensures the success atomicity of importing data to multiple tables at a time. The following code shows the parameters that are usually specified in a data_desc string:
      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      The following table describes the parameters.

      Parameter Description
      file_path Either a specific file path that points to a file, or a file path that contains asterisks (*) as wildcards and points to all files in a directory. The parent directories of the destination directory can also contain wildcards.

      The following special characters are supported as wildcards: ? * [] {} ^. For more information about how to use wildcards, see FileSystem.

      For example, if you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename//, all files in the partitions of the /tablename directory are imported. If you set this parameter to hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/, only files in the 202104 partition of the /tablename directory are imported.

      negative A flag used to indicate that the specified source data has been imported and will be deleted by this import job.

      This parameter is applicable if you want to cancel a batch of data that has been imported to aggregate columns of the SUM type in the destination table. After the import job is complete, the batch of data will be deleted from the aggregate columns.

      partition The partitions in the source files to be imported to the destination table.

      Only source data that belongs to the specified partitions is imported. Source data that does not belong to the specified partitions is determined as erroneous data. If you do not want such data to be determined as erroneous data, use a WHERE predicate to filter it out.

      column_separator The column delimiter that you want to use to separate data in the source files into columns. Default value: \t.

      If you want to specify an invisible character as the column delimiter, add \x as the prefix and set the delimiter in hexadecimal. For example, if the delimiter used in a source Hive file is \x01, set the column delimiter to \\x01.

      file_type The format of the source files. Valid values: parquet, orc, and csv. Default value: csv.

      Parquet files have the file name extension .parquet or .parq.

      COLUMNS FROM PATH AS The partition fields in the path of the source files.
      For example, the path of a source file is /path/col_name=col_value/dt=20210101/file1, and col_name and dt are table columns. The values col_value and 20210101 are imported to the destination columns that correspond to col_name and dt, as shown in the following code:
      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)
      set column mapping The SET statement that contains functions for column type conversion.

      If source columns and destination columns are of different types, you must specify a SET statement for column type conversion.

      where predicate The WHERE predicate that you want to use for data filtering after column type conversion.

      Data that is filtered out is not counted for the calculation of the maximum filter ratio. If data_desc strings altogether contain multiple WHERE predicates for the same table, the predicates are combined by the AND operator.

    • broker_properties
      In the broker_properties string, configure property parameters for the import job. These property parameters apply to the import job.
      broker_properties:
          (key2=value2, ...)

      The following table describes some of the property parameters in the broker_properties string.

      Parameter Description
      timeout The timeout period for the import job. Unit: seconds.

      You can specify a timeout period for each import job in the opt_properties string. If the import job is not complete within the specified timeout period, the state of the import job changes to CANCELLED. The default timeout period for import jobs in Broker Load mode is 4 hours.

      Important You do not necessarily need to specify a timeout period for an import job, unless you estimate that the completion of the import job will take longer than the default timeout period.

      We recommend that you calculate the minimum timeout period in units of seconds by using the following formula: (Total file size in MB × Number of source tables and related rollup tables)/(30 × Concurrency of the import job).

      The number 30 in the formula indicates 30 MB/s, which is the average import speed on backends. For example, the total file size of source data is 1 GB, the one source table has two rollup tables, and the concurrency of the import job is 3. The minimum timeout period is calculated by using the following formula: (1 × 1,024 × 3)/(10 × 3) = 102 seconds.

      StarRocks clusters have varied machine environments and concurrent query jobs. You must estimate the slowest import speed of your StarRocks cluster based on the speed of historical import jobs.

      max_filter_ratio The maximum filter ratio of the import job. Valid values: 0 to 1. Default value: 0. If the error rate of the import job exceeds the maximum filter ratio, the import job fails. If you want erroneous data rows to be ignored, set this parameter to a value greater than 0 to ensure that the import job can succeed.

      Calculate an appropriate maximum filter ratio by using the following formula: max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL).

      dpp.abnorm.ALL indicates the number of data rows that cannot be imported due to various reasons such as type mismatch, column quantity mismatch, and length mismatch. dpp.norm.ALL indicates the number of data rows that can be imported. You can execute the SHOW LOAD statement to query the amount of data imported by the import job.

      Number of data rows in the source files = dpp.abnorm.ALL + dpp.norm.ALL
      load_mem_limit The limit on the memory allocated to the import job. Default value: 0. A value of 0 specifies that the memory allocated to the import job is not limited.
      strict_mode Specifies whether to enable the strict mode for the import job. To enable the strict mode, set this parameter to true: properties ("strict_mode" = "true").

      By default, the strict mode is disabled.

      If the strict mode is enabled, erroneous data is filtered out after column type conversion. Erroneous data refers to the values that are not null in the source files but are converted to null values after column type conversion. Take note of the following items:
      • The strict mode does not apply to source columns whose values are generated based on functions.
      • If a destination column restricts values to a range and a value in the source column can be converted but the result value is out of the range, the strict mode does not apply to the source column. For example, a value in the source column is 10 and the destination column is of the DECIMAL(1,0) type. The value 10 can be converted but the result value is out of the range. The strict mode does not apply to the source column.
  • Sample code for creating an import job to import data from Alibaba Cloud OSS
    Important In StarRocks clusters, you can use broker as the broker name.
    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
    );

Query the status of an import job

Data import in Broker Load mode is asynchronous. To query the status of an import job, specify the label of the job in the SHOW LOAD statement and then execute the statement. To view the syntax of this statement, execute the HELP SHOW LOAD statement.
Important The SHOW LOAD statement supports only asynchronous import jobs. For synchronous import jobs such as those in Stream Load mode, you cannot use the SHOW LOAD statement to query the status.
The following sample code provides an example on how to query the status of an import job:
show load where label = 'label1'\G
*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
The following table describes the response parameters.
Parameter Description
JobId The unique ID of the import job. The job ID is automatically generated by the system. The ID of an import job can never be reused, whereas the label of an import job can be reused if the import job fails.
Label The identifier of the import job.
State The state of the import job. Valid values:
  • PENDING: The import job was waiting to be run.
  • LOADING: The import job was in progress.
  • CANCELLED: The import job failed.
  • FINISHED: The import job was successful.
Progress The progress information of the import job. The progress information describes the two import phases: ETL and LOADING. Data import in Broker Load mode involves only the LOADING phase. The ETL progress is displayed as N/A, and the LOADING progress can be 0 to 100%.

The LOADING progress is calculated by using the following formula: LOADING progress = Number of tables that have been imported/Total number of source tables in this import job × 100%.

When all the source tables have been imported and the import job is ready to be complete, the LOADING progress reaches 99%. The LOADING progress reaches 100% only after the import job is complete.
Important The import progress is not linear. If the progress remains unchanged within a period of time, the import job may still be in progress.
Type The type of the import job. The type of the import job in Broker Load mode is BROKER.
EtlInfo The data amount metrics of the import job: unselected.rows, dpp.norm.ALL, and dpp.abnorm.ALL.

unselected.rows indicates the number of data rows filtered out by the WHERE predicate. dpp.norm.ALL and dpp.abnorm.ALL help determine whether the error rate of the import job exceeds the maximum filter ratio. The sum of the three metrics equals the total number of data rows in the source files.

TaskInfo The parameters that you specified when you created the import job, including the cluster, timeout period, and maximum filter ratio.
ErrorMsg The message returned by the import job. If the import job is in the CANCELLED state, the value of this parameter indicates the cause of the import failure and contains two parts: type and msg. If the import job is in the FINISHED state, the value of this parameter is N/A. The type part has the following valid values:
  • USER-CANCEL: The import job was canceled.
  • ETL-RUN-FAIL: The import job failed in the ETL phase.
  • ETL-QUALITY-UNSATISFIED: The error rate exceeded the maximum filter ratio.
  • LOAD-RUN-FAIL: The import job failed in the LOADING phase.
  • TIMEOUT: The import job was not complete within the timeout period.
  • UNKNOWN: An unknown error occurred during the running of the import job.
CreateTime The time when the import job was created, the time when the ETL phase started, the time when the ETL phase ended, the time when the LOADING phase started, and the time when the import job was complete.
  • Import jobs in Broker Load mode do not have the ETL phase. Therefore, the values of the EtlStartTime, EtlFinishTime, and LoadStartTime parameters are the same.
  • If, within an extended period of time, only the value of the CreateTime parameter is displayed and the value of the LoadStartTime parameter remains N/A, a large number of import jobs are waiting to be run. We recommend that you submit no more import jobs for now.
    LoadFinishTime - CreateTime = Duration of the import job
    
    LoadFinishTime - LoadStartTime = Duration of the LOADING phase for the import job = Duration of the import job - Wait time of the import job
EtlStartTime
EtlFinishTime
LoadStartTime
LoadFinishTime
URL The URL of sample erroneous data during the import job. If the import job involves no erroneous data, the value of this parameter is N/A.
JobDetails The details of the import job, including the number of imported files and their total size in bytes, the number of tasks, the number of processed source data rows, the ID of the backend that runs the tasks, and the ID of the backend on which the waiting tasks reside.
{"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

The number of processed source data rows is updated every 5 seconds. This number indicates only the current progress and does not mean the total number of data rows that are processed after the import job is complete. The latter is shown by the EtlInfo parameter.

Cancel an import job

Import jobs in Broker Load mode can be canceled when they are not in the CANCELLED or FINISHED state. To cancel an import job, specify its label in the CANCEL LOAD statement and then execute the statement. You can execute the HELP CANCEL LOAD statement to view the syntax for canceling an import job.

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

Import data from HDFS

  • Syntax
    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker1'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • HDFS authentication
    The community edition of HDFS supports two authentication modes: simple authentication and Kerberos authentication.
    • Simple authentication: User identities are determined by the operating system of the client that is connected to HDFS.
      The following table describes the parameters.
      Parameter Description
      hadoop.security.authentication The authentication mode. In this example, this parameter is set to simple.
      username The username that is used to log on to HDFS.
      password The password that is used to log on to HDFS.
    • Kerberos authentication: The user identity of a client is determined by its Kerberos credentials.
      The following table describes the parameters.
      Parameter Description
      hadoop.security.authentication The authentication mode. In this example, this parameter is set to kerberos.
      kerberos_principal The principal for Kerberos authentication.
      kerberos_keytab The path of the Kerberos keytab file. The file must reside on the same server as the broker processes.
      kerberos_keytab_content The Base64-encoded content of the Kerberos keytab file.
      Important You must configure one of this parameter and the kerberos_keytab parameter.
  • HA configurations of HDFS
    After you configure high availability (HA) for NameNodes in an HDFS cluster, if the active NameNode is switched to the other one, the new active NameNode can be automatically identified. To access an HDFS cluster deployed in HA mode, configure the parameters that are described in the following table.
    Parameter Description
    dfs.nameservices The name of the HDFS service. You can set a custom name.

    For example, set the dfs.nameservices parameter to my_ha.

    dfs.ha.namenodes.xxx The custom name of the NameNode. Separate multiple names with commas (,). Replace xxx in this parameter name with the custom name that you set for the dfs.nameservices parameter.

    For example, set the dfs.ha.namenodes.my_ha parameter to my_nn.

    dfs.namenode.rpc-address.xxx.nn The address used by the NameNode for remote procedure calls (RPCs). Replace nn in this parameter name with the name of the NameNode that you set for the dfs.ha.namenodes.xxx parameter.

    For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to a value in the Hostname:Port number format.

    dfs.client.failover.proxy.provider The provider that the client uses to connect to the NameNode. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.
    The following sample code provides an example:
    (
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )
    You can use simple authentication or Kerberos authentication to access HDFS clusters deployed in HA mode. The following sample code provides an example on how to access an HA HDFS cluster by using simple authentication:
    (
        "username"="user",
        "password"="passwd",
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    The configurations of an HDFS cluster can be written into the hdfs-site.xml file. If you use a broker process to read information about the HDFS cluster, you need to only specify the file path and authentication information of the cluster.

Example

  1. Create a test table. The following sample code creates a table named lineitem in the tpch database:
    CREATE TABLE lineitem (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber int,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate date,
      l_commitdate date,
      l_receiptdate date,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    ENGINE=OLAP
    DUPLICATE KEY(l_orderkey)
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES(
      "replication_num" = "1"
    );
  2. Create an import job.
    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "xxx"
    );
  3. Query the status of the import job.
    show load where label = 'lineitem'\G;
    
    *************************** 1. row ***************************
             JobId: 12826
             Label: lineitem
             State: FINISHED
          Progress: ETL:100%; LOAD:100%
              Type: BROKER
           EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215
          TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
          ErrorMsg: NULL
        CreateTime: 2022-04-13 15:07:53
      EtlStartTime: 2022-04-13 15:07:56
     EtlFinishTime: 2022-04-13 15:07:56
     LoadStartTime: 2022-04-13 15:07:56
    LoadFinishTime: 2022-04-13 15:08:06
               URL: NULL
        JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020931a":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020931a":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072}
    2 rows in set (0.00 sec)
  4. After the import job is complete, query data as needed.
    • Query the number of data rows in the lineitem table.
      select count(*) from lineitem;
      The following output is returned:
      +----------+
      | count(*) |
      +----------+
      |  6001215 |
      +----------+
      1 row in set (0.03 sec)
    • Query data in the first two rows of the lineitem table.
      select * from lineitem limit 2;
      The following output is returned:
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment                                  |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      |         69 |    115209 |      7721 |            1 |         48 |         58761.6 |       0.01 |  0.07 | A            | F            | 1994-08-17 | 1994-08-11   | 1994-09-08    | NONE           | TRUCK      | regular epitaphs. carefully even ideas hag |
      |         69 |    104180 |      9201 |            2 |         32 |        37893.76 |       0.08 |  0.06 | A            | F            | 1994-08-24 | 1994-08-17   | 1994-08-31    | NONE           | REG AIR    | s sleep carefully bold,                    |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      2 rows in set (0.01 sec)

Concurrency of import jobs

An import job consists of one or more tasks. Tasks are run in parallel. An import job can be split into tasks based on data_desc strings in the LOAD statement. Examples:
  • If an import job has multiple data_desc strings that specify source tables from different data source addresses, the import job is split into tasks, each with a data_desc string.
  • If an import job has multiple data_desc strings that specify different partitions of a source table, the import job is also split into tasks, each with a data_desc string.
Each task can contain one or more instances, which are evenly distributed to backends for parallel running. A task is split into instances based on the following frontend configurations:
  • min_bytes_per_broker_scanner: the minimum amount of data processed by each instance. By default, the minimum data amount is 64 MB.
  • max_broker_concurrency: the maximum number of parallel instances for each task. Default value: 100.
  • load_parallel_instance_num: the number of parallel instances on each backend. Default value: 1.

The total number of instances equals the minimum of the following values: the total size of imported files divided by the value of min_bytes_per_broker_scanner, the value of max_broker_concurrency, and the value of load_parallel_instance_num multiplied by the number of backends.

In most cases, an import job has only one data_desc string and therefore contains only one task. The task is split into instances. The number of instances equals that of the backends. Each instance is assigned to a different backend for parallel running.

FAQ

  • What do I do if the ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel error is reported?

    For more information about how to troubleshoot the error, see FAQ.

  • What do I do if the failed to send batch or "TabletWriter add batch with unknown id" error is reported?

    Modify the query_timeout and streaming_load_rpc_max_alive_time_sec parameters. For more information, see the backend configurations described in General system configurations.

  • What do I do if the LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found error is reported?
    If data is imported from a Parquet file or an ORC file, make sure that the column names in the file header are consistent with those in the destination StarRocks table. The following sample code provides an example:
    (tmp_c1,tmp_c2)
    SET
    (
       id=tmp_c2,
       name=tmp_c1
    )
    As shown in the code, the columns tmp_c1 and tmp_c2 in the Parquet or ORC file are mapped respectively to the columns id and name in the StarRocks table. If you do not specify a SET statement, the source columns are mapped to the columns that have the same names but may not exist in the StarRocks table.
    Important If you import an ORC file that is generated in Hive of a specific version, the table header in the ORC file may not be Hive metadata but column names, such as (_col0, _col1, _col2, ...). In this case, the "Invalid Column Name" error may be reported. You must specify a SET statement for correct mapping.
  • How do I configure ViewFs in HDFS Federation?

    Copy the ViewFs configuration files core-site.xml and hdfs-site.xml to the broker/conf directory. If a custom file system is available, copy the JAR file to the broker/lib directory.

  • What do I do if the Can't get Kerberos realm error is reported when I access a cluster with Kerberos authentication?

    First, make sure that the /etc/krb5.conf file is configured on all physical machines of the brokers and then try again. If the error is still reported, add -Djava.security.krb5.conf:/etc/krb5.conf to the JAVA_OPTS variable in the broker startup script.

  • What do I do if an import job stays in the PENDING state for an extended period of time?

    Search for the keyword "error" in the log/be.INFO file on the backend to identify the cause.