Routine Load is a routine import method. StarRocks allows you to use this method to continuously import data from Apache Kafka and control the pause, resumption, and stop of import jobs by using SQL statements. This topic describes the basic principles, import example, and FAQ of Routine Load.

Terms

  • RoutineLoadJob: a routine import job that is submitted
  • JobScheduler: the routine import job scheduler that is used to schedule and split a RoutineLoadJob into multiple tasks
  • Task: the task that is split from a RoutineLoadJob by JobScheduler based on rules
  • TaskScheduler: the task scheduler that is used to schedule the execution of a task

Basic principles

The following figure shows the import process of Routine Load. Routine Load
The following steps show how to import data by using Routine Load.
  1. You submit a Kafka import job to the frontend by using a client that supports the MySQL protocol.
  2. The frontend splits the import job into multiple tasks. Each task imports a specified part of data.
  3. Each task is assigned to the specified backend for execution. On the backend, a task is regarded as a regular import job and imports data based on the import mechanism of Stream Load.
  4. After the import process is completed on the backend, the backend reports the import result to the frontend.
  5. The frontend continues to generate new tasks or retry failed tasks based on the import result.
  6. The frontend continuously generates new tasks to achieve the uninterrupted import of data.
Note The images and some information in this topic are from Continuously load data from Apache Kafka of open source StarRocks.

Example

Environment requirements

  • Access to Kafka clusters without authentication or SSL authentication is supported.
  • A message can be in one of the following formats:
    • CSV format. In this format, each message serves as a line, and the end of the line does not contain a line feed.
    • JSON format.
  • The Array type is not supported.
  • Only Kafka V0.10.0.0 and later are supported.

Create an import job

  • Syntax
    CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]
    The following table describes the parameters.
    ParameterRequiredDescription
    job_nameYesThe name of the import job. The name of the import database can be placed in the front. The name is usually in the format of timestamp plus table name. The name of the job must be unique in a database.
    table_nameYesThe name of the destination table.
    COLUMNS TERMINATED clauseNoThe column delimiter in the source data file. Default value: \t.
    COLUMNS clauseNoThe mapping between columns in the source data file and columns in the destination table.
    • Mapped columns: For example, the destination table has three columns, col1, col2, and col3, whereas the source data file has four columns, and the first, second, and fourth columns in the destination table correspond to col2, col1, and col3 in the source data file. In this case, the clause can be written as COLUMNS (col2, col1, temp, col3). The temp column does not exist and is used to skip the third column in the source data file.
    • Derived columns: StarRocks can not only read the data in a column of the source data file but also provide processing operations on data columns. For example, a column col4 is added to the destination table, and the value of col4 is equal to the value of col1 plus the value of col2. In this case, the clause can be written as COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
    WHERE clauseNoThe filter conditions that you want to use to filter out the rows that you do not need. The filter conditions can be specified on mapped columns or derived columns.

    For example, if only rows with k1 greater than 100 and k2 equal to 1000 are imported, the clause can be written as WHERE k1 > 100 and k2 = 1000.

    PARTITION clauseNoThe partition of the destination table. If you do not specify the partition, the source data file is automatically imported to the corresponding partition.
    PROPERTIES clauseNoThe common parameters for the import job.
    desired_concurrent_numberNoThe maximum number of tasks into which the import job can be split. The value must be greater than 0. Default value: 3.
    max_batch_intervalNoThe maximum execution time of each task. Valid values: 5 to 60. Unit: seconds. Default value: 10.

    In V1.15 and later, this parameter specifies the scheduling time of the task. You can specify how often the task is executed. routine_load_task_consume_second in fe.conf specifies the amount of time required by the task to consume data. Default value: 3s. routine_load_task_timeout_second in fe.conf specifies the execution timeout period of the task. Default value: 15s.

    max_batch_rowsNoThe maximum number of rows that each task can read. The value must be greater than or equal to 200000. Default value: 200000.

    In V1.15 and later, this parameter is used only to define the range of the error detection window. The range of the window is 10 × max-batch-rows.

    max_batch_sizeNoThe maximum number of bytes that each task can read. Unit: bytes. Valid value: 100 MB to 1 GB. Default value: 100 MB.

    In V1.15 and later, this parameter is discarded. routine_load_task_consume_second in fe.conf specifies the amount of time required by the task to consume data. Default value: 3s.

    max_error_numberNoThe maximum number of error rows allowed in the sampling window. The value must be greater than or equal to 0. Default value: 0. No error row is allowed.
    Important Rows filtered out by the WHERE condition are not error rows.
    strict_modeNoSpecifies whether to enable the strict mode. By default, this mode is enabled.

    If the column type of non-empty raw data is changed to NULL after you enable the strict mode, the data is filtered out. To disable the strict mode, set this parameter to false.

    timezoneNoThe time zone that is used for the import job.

    By default, the value of the timezone parameter of the session is used. This parameter affects the results of all time zone-related functions involved in the import.

    DATA_SOURCEYesThe type of the data source. Set the value to KAFKA.
    data_source_propertiesNoThe information about the data source. The value includes the following fields:
    • kafka_broker_list: the connection information about the Kafka broker. Format: ip:host. Separate multiple brokers with commas (,).
    • kafka_topic: the Kafka topic to which you want to subscribe.
      Note The kafka_broker_list and kafka_topic fields are required.
    • kafka_partitions and kafka_offsets: the Kafka partitions to which you want to subscribe and the start offset of each partition.
    • property: Kafka-related properties. This field is equivalent to the --property parameter in Kafka Shell. You can run the HELP ROUTINE LOAD; command to view the more detailed syntax for creating an import job.
    Note You can run the HELP ROUTINE LOAD; command to view the more detailed syntax for creating an import job.
  • Example: Import data from a local Kafka cluster.
    CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
    COLUMNS TERMINATED BY ",",
    COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
    PROPERTIES
    (
        "desired_concurrent_number"="1",
        "max_error_number"="1000"
    )
    FROM KAFKA
    (
        "kafka_broker_list"= "localhost:9092",
        "kafka_topic" = "starrocks-load"
    );

View the status of a job

  • Run the following command to display all routine import jobs in a database, including the jobs that are stopped or canceled: The jobs are displayed in one or more rows.
    USE [database];
    SHOW ALL ROUTINE LOAD;
  • Run the following command to display the running routine import job named job_name in a database:
    SHOW ROUTINE LOAD FOR [database].[job_name];
Important StarRocks allows you to view only the running jobs. The completed jobs and the jobs that have not been run cannot be viewed.

You can run the HELP SHOW ROUTINE LOAD command to obtain specific commands and examples for viewing the status of the job. You can run the HELP SHOW ROUTINE LOAD TASK command to obtain specific commands and examples for viewing the running status of a job (including tasks).

You can run the SHOW ALL ROUTINE LOAD command to view all running Routine Load jobs. The following output is returned:
*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)
In this example, an import job named routine_load_wikipedia is created. The following table describes the parameters.
ParameterDescription
StateThe status of the import job. RUNNING indicates that the import job is continuously running.
StatisticThe progress information, which records the import information since the job is created.
receivedBytesThe size of the received data. Unit: bytes.
errorRowsThe number of imported error rows.
committedTaskNumThe number of tasks submitted by the frontend.
loadedRowsThe number of imported rows.
loadRowsRateThe rate at which data is imported. Unit: row/s.
abortedTaskNumThe number of failed tasks on the backend.
totalRowsThe total number of received rows.
unselectedRowsThe number of rows that are filtered out by the WHERE condition.
receivedBytesRateThe rate at which data is received. Unit: bytes/s.
taskExecuteTimeMsThe import duration. Unit: milliseconds.
ErrorLogUrlsThe error message log. You can use the URL to view the error message during the import process.

Pause the import job

Use the PAUSE statement to make the import job enter the PAUSED state. Data import is paused, but the job is not terminated. You can use the RESUME statement to resume the job.

For example, pause a routine import job named job_name.
PAUSE ROUTINE LOAD FOR [job_name];

You can use the HELP PAUSE ROUTINE LOAD command to view the help information and examples.

After the import job is paused, the status of the job changes to PAUSED, and the import information in Statistic and Progress stops being updated. In this case, the job is not terminated. You can use the SHOW ROUTINE LOAD statement to view the paused import job.

Resume the import job

Use the RESUME statement to make the job temporarily enter the NEED_SCHEDULE state. The job is being rescheduled. After a period of time, the job returns to the RUNNING state, and data import continues.

For example, resume a routine import job named job_name.
RESUME ROUTINE LOAD FOR [job_name];

You can use the HELP RESUME ROUTINE LOAD command to view the help information and examples.

Run the SHOW ROUTINE LOAD command to view the status of the job. The following output is returned:
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: NEED_SCHEDULE
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
            Progress: {"0":"13824771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
            OtherMsg:
1 row in set (0.00 sec)
Run the SHOW ROUTINE LOAD command again to view the status of the job. The following output is returned:
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7422000,"taskExecuteTimeMs":23623}
            Progress: {"0":"14024771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.**.**:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
            OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified
Note When you query the job for the first time, the status of the job changes to NEED_SCHEDULE. The job is being rescheduled. When you query the job for the second time, the status of the job changes to RUNNING. At the same time, the import information in Statistic and Progress starts to be updated, and data import continues.

Stop the import job

Use the STOP statement to make the import job enter the STOP state. Data import is stopped, and the job is terminated. Data import cannot be resumed.

For example, stop a routine import job named job_name.
STOP ROUTINE LOAD FOR [job_name];

You can use the HELP STOP ROUTINE LOAD command to view the help information and examples.

Run the SHOW ROUTINE LOAD command to view the status of the job. The following output is returned:
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: 2020-05-16 16:08:25
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: STOPPED
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecuteTimeMs":47173}
            Progress: {"0":"16414875"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.**.**:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.**.**:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
            OtherMsg:

After the import job is stopped, the status of the job changes to STOPPED, and the import information in Statistic and Progress is no longer updated. In this case, you cannot view the stopped import job by using the SHOW ROUTINE LOAD statement.