All Products
Search
Document Center

E-MapReduce:Routine Load

Last Updated:Apr 28, 2024

Routine Load allows you to submit a longtime import job. You can use Routine Load to continuously import data to Doris from the specified data source. This topic describes how Routine Load works, how to use Routine Load, and the best practices.

Limits

Routine Load allows you to import data only from Kafka.

  • Access to Kafka clusters with no authentication or SSL authentication is supported.

  • Messages can be in CSV or JSON format. In CSV format, each message is presented as one line, and the end of the line is not a line feed.

  • By default, Kafka 0.10.0.0 and later are supported. If you want to use a Kafka version earlier than 0.10.0.0, such as 0.9.0, 0.8.2, 0.8.1, or 0.8.0, modify the configurations of backends (BEs). You need to set the kafka_broker_version_fallback parameter to the earlier version that you want to use. Alternatively, you can set the property.broker.version.fallback parameter to the earlier version when you create a Routine Load job.

    Note

    If you use a Kafka version earlier than 0.10.0.0, some features of Routine Load may become unavailable. For example, you cannot set a time-based offset for a Kafka partition.

How it works

The following figure shows how a client submits a Routine Load job to a frontend (FE) and how the FE assigns the job to BEs.

+---------+
         |  Client |
         +----+----+
              |
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+
     |         |        |
     v         v        v
 +---+--+   +--+---+   ++-----+
 |  BE  |   |  BE  |   |  BE  |
 +------+   +------+   +------+
  1. The FE splits the Routine Load job into several tasks by using JobScheduler. Each task imports a specific part of data. Each task is assigned to the specified backend by TaskScheduler for execution.

  2. A BE considers a task as a regular import job and imports data by using Stream Load. After the import is complete, the BE reports the import result to the FE.

  3. The JobScheduler in the FE continues to generate new tasks or retry failed tasks based on the import results.

  4. The FE continuously generates new tasks to achieve uninterrupted data import.

Use Routine Load to import data from Kafka

This section describes how to use Routine Load to import data from Kafka and the best practices.

Create a Routine Load job

For more information about the detailed syntax for creating a Routine Load job, see CREATE ROUTINE LOAD. You can also run the HELP ROUTINE LOAD; command to view the help information about the command. The following examples demonstrate how to create a Routine Load job.

  • Create a Routine Load job named test1 for the example_tbl table of the example_db database. Specify the column delimiter, group ID, and client ID. Configure the job to subscribe to all partitions by default and set the default offset to OFFSET_BEGINNING. OFFSET_BEGINNING indicates that data consumption starts from where data is available.

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
  • Create a Routine Load job named test1 for the example_tbl table of the example_db database in strict mode.

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            WHERE k1 > 100 and k2 like "%doris%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "true"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
  • Example of importing JSON-formatted data

    Routine Load supports only the following two types of JSON-formatted data:

    A JSON object that contains only one record:

    {"category":"a9jadhx","author":"test","price":895}

    A JSON array that can contain multiple records:

    [
        {
            "category":"11",
            "author":"4avc",
            "price":895,
            "timestamp":1589191587
        },
        {
            "category":"22",
            "author":"2avc",
            "price":895,
            "timestamp":1589191487
        },
        {
            "category":"33",
            "author":"3avc",
            "price":342,
            "timestamp":1589191387
        }
    ]

    Create a Doris data table to which data is imported. Sample statement:

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20200509")),
        PARTITION p20200509 VALUES [("20200509"), ("20200510")),
        PARTITION p20200510 VALUES [("20200510"), ("20200511")),
        PARTITION p20200511 VALUES [("20200511"), ("20200512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    PROPERTIES (
        "replication_num" = "1"
    );

    Import JSON-formatted data in which each record is stored as a JSON object. Sample statement:

    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
     );

    Import JSON-formatted data in which multiple records are stored in a JSON array. Sample statement:

    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json",
        "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array" = "true"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "0,0,0"
    );
    Note

    The partition field dt in the Doris data table is unavailable in the sample data. Therefore, the values of the dt field are generated by using dt=from_unixtime(timestamp, '%Y%m%d') in the Routine Load statement.

    • Relationships between the strict mode and source data

      • Relationships between the strict mode and source data if the type of the column in the table is TinyInt and the column allows NULL values to be imported

        Source data

        Source data example

        String to Int

        Strict mode

        Result

        NULL

        \N

        N/A

        true or false

        NULL

        NOT NULL

        aaa or 2000

        NULL

        true

        Invalid data (filtered)

        NOT NULL

        aaa

        NULL

        true

        NULL

        NOT NULL

        1

        1

        true or false

        Correct data

      • Relationships between the strict mode and source data if the type of the column in the table is Decimal(1,0) and the column allows NULL values to be imported

        Source data

        Source data example

        String to Int

        Strict mode

        Result

        NULL

        \N

        N/A

        true or false

        NULL

        NOT NULL

        aaa

        NULL

        true

        Invalid data (filtered)

        NOT NULL

        aaa

        NULL

        false

        NULL

        NOT NULL

        1 or 10

        1

        true or false

        Correct data

        Note

        The value 10 is not a valid value of the Decimal(1,0) type. However, the value 10 is not filtered out in strict mode because 10 meets the requirements of the Decimal type. The value 10 will be filtered out in an extract, transform, load (ETL) process.

    • Access a Kafka cluster with SSL authentication

      To access a Kafka cluster with SSL authentication, you must provide the public key certificate (ca.pem) of Kafka brokers. If client authentication is also enabled for the Kafka cluster, you must provide the public key certificate of the client (client.pem), key file (client.key), and password. You can run the CREATE FILE command to upload the files to Doris. In the command, set the catalog parameter to kafka. You can run the HELP CREATE FILE; command to view the help information about the CREATE FILE command. The following example shows you how to access a Kafka cluster with SSL authentication.

      1. Upload files.

        CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
        CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
        CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
      2. Create a Routine Load job.

        CREATE ROUTINE LOAD db1.job1 on tbl1
        PROPERTIES
        (
            "desired_concurrent_number"="1"
        )
        FROM KAFKA
        (
            "kafka_broker_list"= "broker1:9091,broker2:9091",
            "kafka_topic" = "my_topic",
            "property.security.protocol" = "ssl",
            "property.ssl.ca.location" = "FILE:ca.pem",
            "property.ssl.certificate.location" = "FILE:client.pem",
            "property.ssl.key.location" = "FILE:client.key",
            "property.ssl.key.password" = "abcd***"
        );

      Doris accesses a Kafka cluster by calling the C++ API librdkafka of Kafka. For more information about the parameters supported by librdkafka, see librdkafka.

View the status of a Routine Load job

You can run the SHOW ROUTINE LOAD command to view the status of a Routine Load job. For more information about the command syntax and examples, run the HELP SHOW ROUTINE LOAD; command.

You can run the SHOW ROUTINE LOAD TASK command to view the status of tasks in a Routine Load job. For more information about the command syntax and examples, run the HELP SHOW ROUTINE LOAD TASK; command.

Note

You can view only the status of a running job. The status of completed jobs and jobs that have not been run cannot be viewed.

Modify the properties of a Routine Load job

You can run the ALTER ROUTINE LOAD command to modify the properties of a Routine Load job that has been created. For more information, see ALTER ROUTINE LOAD. You can also run the HELP ALTER ROUTINE LOAD; command to view the help information about the command.

Stop, pause, or resume a Routine Load job

You can run the STOP, PAUSE, or RESUME command to stop, pause, or resume a Routine Load job. For more information about the command syntax and examples, run the HELP STOP ROUTINE LOAD;, HELP PAUSE ROUTINE LOAD;, or HELP RESUME ROUTINE LOAD; command.

Additional information

  • Relationships between a Routine Load job and ALTER TABLE operations

    • A Routine Load job does not block SCHEMA CHANGE or ROLLUP operations.

      However, if columns in the source data cannot match columns in the destination table after a SCHEMA CHANGE operation, the number of incorrect data records increases and the job is eventually paused. We recommend that you explicitly specify column mappings in a Routine Load job and use NULLABLE columns or columns with the DEFAULT constraint to prevent this issue.

    • If you delete a partition of a table, data may fail to be imported because the partition cannot be found. In this case, the job is paused.

  • Relationships between a Routine Import job and LOAD, DELETE, and INSERT operations

    • A Routine Load job does not conflict with LOAD or INSERT operations.

    • To perform a DELETE operation on a table, you must make sure that no data is being imported to the corresponding table partition. Therefore, before you perform a DELETE operation, you must pause a Routine Load job and wait until all the tasks that have been assigned are complete.

  • Relationships between a Routine Load job and DROP DATABASE or DROP TABLE operations: If the database or table to which a Routine Load job is importing data is deleted, the job is canceled.

  • Relationships between a Routine Load job and a Kafka topic

    If the kafka topic that you specify for a Routine Load when you create the job does not exist in the Kafka cluster, Kafka brokers can automatically create the topic based on the setting of the auto.create.topics.enable parameter.

    • If the auto.create.topics.enable parameter is set to true for a Kafka broker, the topic is automatically created. The number of partitions that are automatically created is determined by the num.partitions parameter of the Kafka broker. The Routine Load job continuously reads the data of the topic as normal.

    • If the auto.create.topics.enable parameter is set to false for a Kafka broker, the topic is not automatically created. In this case, the Routine Load job is paused until data is available.

    Therefore, if you want a topic to be automatically created, set the auto.create.topics.enable parameter to true for brokers in the Kafka cluster.

  • Issues that may occur in a network-isolated environment

    In some environments, isolation measures for CIDR blocks or domain name resolution exist. In this case, take note of the following items:

    • Doris must be able to access the brokers in the broker list that is specified for a Routine Load job.

    • If advertised listeners are configured in a Kafka cluster, Doris must be able to access the addresses of the advertised listeners.

  • Specify the partitions and offsets for data consumption

    Doris allows you to specify the partitions and offsets for data consumption. In the new version, you can also specify a point in time to start data consumption. The following three parameters are involved:

    • kafka_partitions: the partitions whose data is to be consumed. Example: "0, 1, 2, 3".

    • kafka_offsets: the start offset of each partition. The number of offsets that you specify for this parameter must be the same as the number of partitions that you specify for the kafka_partitions parameter. Example: "1000, 1000, 2000, 2000".

    • property.kafka_default_offset: the default offset of the partitions.

    When you create a Routine Load job, you can combine the three parameters in the following ways.

    Combination

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    Behavior

    1

    No

    No

    No

    The system automatically finds all partitions of the Kafka topic and starts data consumption from the end of the partitions.

    2

    No

    No

    Yes

    The system automatically finds all partitions of the Kafka topic and starts data consumption from the default offset.

    3

    Yes

    No

    No

    The system starts data consumption from the end of the specified partitions.

    4

    Yes

    Yes

    No

    The system starts data consumption from the specified offsets of the specified partitions.

    5

    Yes

    No

    Yes

    The system starts data consumption from the default offset of the specified partitions.

  • Difference between STOP and PAUSE

    An FE automatically clears Routine Load jobs in the STOPPED state on a regular basis. Routine Load jobs in the PAUSED state can be resumed.

Parameters

The following table describes the system parameters that can affect the use of Routine Load.

Parameter

FE or BE

Default value

Description

max_routine_load_task_concurrent_num

FE

5

The maximum number of tasks into which a Routine Load job can be split. You can modify this parameter when Routine Load jobs are running. We recommend that you use the default value. If the parameter is set to a large value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.

max_routine_load_task_num_per_be

FE

5

The maximum number of tasks that can be concurrently run on each BE. You can modify this parameter when Routine Load jobs are running. We recommend that you use the default value. If the parameter is set to a large value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.

max_routine_load_job_num

FE

100

The maximum number of Routine Load jobs, including the jobs in the NEED_SCHEDULED, RUNNING, and PAUSED states. You can modify this parameter when Routine Load jobs are running. If the total number of Routine Load jobs reaches the maximum value, no more jobs can be submitted.

max_consumer_num_per_group

BE

3

The maximum number of consumers that can be generated in a task to consume data. For a Kafka data source, a consumer may consume data from one or more partitions. For example, if a job consumes data from six partitions, a task generates three consumers, and each consumer consumes data from two partitions. If a job consumes data from only two partitions, a task generates two consumers, and each consumer consumes data from one partition.

push_write_mbytes_per_sec

BE

10 MB/s

The common parameter for all import jobs, including Routine Load jobs. This parameter specifies the maximum speed at which data can be written to disks. For high-performance storage devices such as SSDs, you can set this parameter to a greater value.

max_tolerable_backend_down_num

FE

0

The maximum number of failed BEs that can be tolerated. This parameter specifies a condition that must be met before Doris can reschedule jobs in the PAUSED state. After a paused job is rescheduled, the state of the job becomes RUNNING. A value of 0 indicates that rescheduling is allowed only when all BEs are in the Alive state.

period_of_auto_resume_min

FE

5 minutes

The time period in which Doris can attempt to reschedule jobs. Doris attempts to reschedule a job up to three times within the time period, which is 5 minutes by default. If all three attempts fail, the job is locked and no further scheduling is performed. However, you can manually resume the job.