All Products
Search
Document Center

ApsaraDB for SelectDB:Use Routine Load to import data

Last Updated:Apr 27, 2025

Routine Load allows you to submit a resident import job to continuously read and import data from a specific data source into an ApsaraDB for SelectDB instance. This topic describes how to use Routine Load to import data from a Kafka data source into an ApsaraDB for SelectDB instance.

Prerequisites

  • The data source must be a Kafka data source. A Routine Load job allows you to access Kafka clusters without authentication or Kafka clusters that support PLAIN, SSL, or Kerberos authentication.

  • Messages must be in the CSV or JSON format. In the CSV format, each message is displayed as one line without a line feed at the end of the line.

Usage notes

By default, Kafka 0.10.0.0 and later are supported. If you want to use Kafka of a version that is earlier than 0.10.0.0, such as 0.9.0, 0.8.2, 0.8.1, or 0.8.0, use one of the following methods:

  • You can set the value of the kafka_broker_version_fallback parameter in the configurations of backends (BEs) to an earlier version of Kafka that you want to use.

  • You can also set the value of the property.broker.version.fallback parameter to an earlier version of Kafka when you create a Routine Load job.

Note

If you use Kafka of a version that is earlier than 0.10.0.0, some features of Routine Load may be unavailable. For example, you cannot set a time-based offset for a Kafka partition.

Create a Routine Load job

To use Routine Load, you must create a Routine Load job. The Routine Load job continuously schedules tasks based on routine scheduling. Each task consumes a specific number of Kafka messages.

Syntax

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

Parameters

Parameter

Description

[db.]job_name

The name of the Routine Load job. In a database, if multiple jobs have the same name, only one of the jobs can be run at a time.

tbl_name

The name of the destination table into which data is to be imported.

merge_type

The data merging mode of the import. Default value: APPEND, which specifies that the import is a standard append operation. You can set this parameter to MERGE or DELETE only for tables that use the Unique Key model. If this parameter is set to MERGE, the DELETE ON statement must be used to specify the column that serves as the Delete Flag column. If this parameter is set to DELETE, all imported data is deleted from the destination table.

load_properties

The parameters that are used to process the imported data. For more information, see the Parameters of load_properties section of this topic.

job_properties

The parameters related to the Routine Load job. For more information, see the Parameters of job_properties section of this topic.

data_source_properties

The type of the data source. For more information, see the Parameters of data_source_properties section of this topic.

Parameters of load_properties

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

Parameter

Example

Description

column_separator

COLUMNS TERMINATED BY ","

The column delimiter. Default value: \t.

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

The mappings between columns in the imported file and columns in the destination table and various column conversion operations. For more information, see Converting Source Data.

preceding_filter

N/A

The conditions for filtering the source data. For more information, see Converting Source Data.

where_predicates

WHERE k1>100 and k2=1000

The conditions for filtering the imported data. For more information, see Converting Source Data.

partitions

PARTITION(p1,p2,p3)

The partitions into which data is imported in the destination table. If you do not specify the partitions, the source data is automatically imported into the corresponding partition.

DELETE ON

DELETE ON v3>100

The statement that is used to specify the Delete Flag column in the imported data and the calculation relationship.

Note

This parameter is required if the merge_type parameter is set to MERGE. This parameter is valid only for tables that use the Unique Key model.

ORDER BY

N/A

The statement that is used to specify the Sequence Col column in the imported data. This parameter is used to maintain the correct order of data during the import.

Note

This parameter is valid only for tables that use the Unique Key model.

Parameters of job_properties

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
Note

A Routine Load job is divided into multiple tasks. The max_batch_interval parameter specifies the maximum execution duration of a task. The max_batch_rows parameter specifies the maximum number of rows that can be read by a task. The max_batch_size parameter specifies the maximum number of bytes that can be read by a task. If one of the thresholds specified by the three parameters is reached, the task ends.

Parameter

Example

Description

desired_concurrent_number

"desired_concurrent_number" = "3"

The maximum number of tasks that can be concurrently run. The value must be an integer that is greater than 0. Default value: 3. A Routine Load job is divided into multiple tasks. This parameter specifies the maximum number of tasks that can be concurrently run for a job.

Note
  1. The actual number of tasks that are concurrently run may not reach the specified threshold. The actual number varies based on the number of nodes in the cluster, the load, and the data source.

  2. If you increase the concurrency, the Routine Load job can be accelerated based on the distributed cluster. However, if the concurrency is excessively large, a large number of files of small size may be written. We recommend that you set this parameter to Number of cores in the cluster/16.

max_batch_interval

"max_batch_interval" = "20"

The maximum execution duration of each task. Unit: seconds. Default value: 10. Valid values: 5 to 60.

max_batch_rows

"max_batch_rows" = "300000"

The maximum number of rows that can be read by each task. Default value: 200000. The value must be greater than or equal to 200000.

max_batch_size

"max_batch_size" = "209715200"

The maximum number of bytes that can be read by each task. Unit: bytes. Default value: 104857600, which is 100 MB. Valid values are in the range of 100 MB to 1 GB.

max_error_number

"max_error_number"="3"

The maximum number of error rows that are allowed in the sampling window. Default value: 0, which specifies that no error rows are allowed. The value must be an integer that is greater than 0.

The sampling window is ten times the value of the max_batch_rows parameter. If the number of error rows within the sampling window is greater than the threshold, the Routine Load job is paused and manual intervention is required to check the data quality.

Note

The rows that are filtered out by the WHERE conditions are not considered error rows.

strict_mode

"strict_mode"="true"

Specifies whether to enable the strict mode. Default value: false. In strict mode, if a NOT NULL value of a source column is converted to a NULL value of the corresponding destination column after column type conversion, the value is filtered out. If the strict mode is enabled, the data after column type conversion is strictly filtered during the import process based on the following rules:

  • Error data is filtered out after column type conversion. Error data refers to NULL data that is generated in the destination column from NOT NULL data of the source column after column type conversion.

  • The strict mode does not apply to destination columns whose values are generated by functions.

  • If a destination column restricts values to a specific range and a value of the source column supports type conversion but the converted value does not belong to the range, the strict mode does not apply to the destination column. For example, a value of the source column is 10 and the destination column is of the DECIMAL(1,0) type. The value 10 can be converted but the converted value does not belong to the range specified for the destination column. In this case, the strict mode does not apply to the destination column.

timezone

"timezone" = "Africa/Abidjan"

The time zone that is used for the Routine Load job. By default, the time zone of the sessions is used.

Note

This parameter affects the results of all time zone-related functions that are involved in the Routine Load job.

format

"format" = "json"

The format of the imported data. Default value: CSV. The JSON format is also supported.

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

The fields to be extracted from the JSON data if the imported data is in the JSON format.

strip_outer_array

-H "strip_outer_array:true"

Specifies whether to display the JSON data as an array if the imported data is in the JSON format. If the strip_outer_array parameter is set to true, the JSON data is displayed as an array. Each element in the JSON data is considered a row of data. Default value: false.

json_root

-H "json_root:$.RECORDS"

The root node of the JSON data if the imported data is in the JSON format. ApsaraDB for SelectDB extracts and parses the elements of the root node that is specified by the json_root parameter. By default, this parameter is left empty.

send_batch_parallelism

N/A

The maximum number of concurrent jobs to send data for batch processing. If the value of this parameter is greater than the value of the max_send_batch_parallelism_per_job parameter of the BE configurations, the BE uses the value of the max_send_batch_parallelism_per_job parameter.

load_to_single_tablet

N/A

Specifies whether to import data into only one tablet of a partition. Default value: false. This parameter is available only if data is imported into a table that uses the Duplicate Key model and contains random partitions.

Relationships between the strict mode and the source data to be imported

In this example, a column of the TINYLNT type is to be imported. The following table describes the relationships between the strict mode and the source data if the system allows NULL column values to be imported.

Source data

Example

STRING to INT

Strict mode

Result

NULL

\N

N/A

true or false

NULL

NOT NULL

aaa or 2000

NULL

true

Invalid data (filtered)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1

1

true or false

Correct data

In this example, a column of the DECIMAL(1,0) type is to be imported. The following table describes the relationships between the strict mode and the source data if the system allows NULL column values to be imported.

Source data

Example

STRING to INT

Strict mode

Result

NULL

\N

N/A

true or false

NULL

NOT NULL

aaa

NULL

true

Invalid data (filtered)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1 or 10

1

true or false

Correct data

Note

The value 10 exceeds the allowed range of the DECIMAL(1,0) type. However, the value 10 is not filtered out in strict mode because the value 10 meets the requirements of the DECIMAL type. The value 10 is eventually filtered out in an extract, transform, and load (ETL) process.

Parameters of data_source_properties

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

Parameter

Description

kafka_broker_list

The configurations that are used to connect to brokers in the Kafka cluster. Format: ip:host. Separate multiple configurations with commas (,).

Example: "kafka_broker_list"="broker1:9092,broker2:9092".

kafka_topic

The Kafka topic to which you want to subscribe.

Format: "kafka_topic"="my_topic".

kafka_partitions/kafka_offsets

The Kafka partitions to which you want to subscribe and the start offset of each partition. If you specify a point in time, data consumption starts from the latest offset that is greater than or equal to this point in time.

You can specify an offset that is greater than or equal to 0. Alternatively, set the kafka_offsets parameter to one of the following values:

  • OFFSET_BEGINNING: Subscribe to the partitions from an offset at which data is available.

  • OFFSET_END: Subscribe to the partitions from the end offset.

  • Specify a point in time in the yyyy-MM-dd HH:mm:ss format. Example: 2021-05-22 11:00:00.

If you do not specify this parameter, the system subscribes to all partitions in the topic from the end offset.

Examples:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
Important

The time format cannot be mixed with the offset format.

property

The custom Kafka parameters. This parameter is equivalent to the --property parameter in the Kafka shell.

If the value of this parameter is a file, you need to add the keyword FILE: before the value.

Property parameters

  • If you connect to a Kafka cluster by using the SSL authentication method, you must configure the following parameters:

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    The property.security.protocol and property.ssl.ca.location parameters are required to specify the method that is used to connect to the Kafka cluster and the location of the certificate authority (CA) certificate.

    If the client authentication mode is enabled for the Kafka server, you must configure the following parameters:

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    The preceding parameters are used to specify the location of the public key certificate of the client, the location of the private key file of the client, and the password for accessing the private key of the client.

  • Specify the default start offset of a Kafka partition.

    By default, if the kafka_partitions/kafka_offsets parameter is not specified, data of all partitions is consumed. In this case, you can configure the kafka_default_offsets parameter to specify the start offset of each partition. Default value: OFFSET_END, which indicates that partitions are subscribed to from the end offset.

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

For more information about supported custom parameters, see the client configuration items in the official configuration document of librdkafka. For example, the following custom parameters are available:

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

Examples

Create a Routine Load job

  1. Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. Configure the parameters in the following sample code to import data.

    • Create a Routine Load job named test1 for the test_table table in the example_db database. Specify the group ID, client ID, and column delimiter, enable the system to automatically consume data of all partitions by default, and subscribe to the partitions from the start offset at which data is available. Sample code:

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • Create a Routine Load job named test2 for the test_table table in the example_db database. Enable the strict mode for the job. Sample code:

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • Consume data of partitions from the specified point in time. Sample code:

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

Import JSON data

You can use Routine Load to import JSON data of the following two types:

  • The JSON data contains only one data record and is a JSON object.

    If you use the single-table import mode, the JSON data is in the following format. In this mode, the table name is specified by executing the ON TABLE_NAME statement.

    {"key1":"value1","key2":"value2","key3":"value3"}

    If you use the dynamic or multi-table import mode, the JSON data is in the following format. In this mode, table names are not specified.

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • The JSON data is an array that contains multiple data records.

    If you use the single-table import mode, the JSON data is in the following format. In this mode, the table name is specified by executing the ON TABLE_NAME statement.

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    If you use the dynamic or multi-table import mode, the JSON data is in the following format. In this mode, table names are not specified.

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

Import JSON data.

  1. Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. Import JSON data of the preceding two types into a topic. Sample code:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. Import JSON data in different modes.

    • Import JSON data in simple mode. Sample code:

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
       );
    • Accurately import JSON data. Sample code:

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json",
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
          "strip_outer_array" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
      );
      Note

      The partition field dt in the table is unavailable in the sample data. The values of the dt field are generated by using the configuration dt=from_unixtime(timestamp,'%Y%m%d') in the CREATE ROUTINE LOAD statement.

Access Kafka clusters that use different authentication methods

The following examples show how to access Kafka clusters based on the authentication methods of the Kafka clusters.

  1. Access a Kafka cluster for which the SSL authentication method is enabled.

    To access a Kafka cluster for which the SSL authentication method is enabled, you must provide the certificate file (ca.pem) that is used to authenticate the public keys of Kafka brokers. If the client authentication mode is enabled for the Kafka cluster, the public key certificate (client.pem), private key file (client.key), and private key password of the client are also required. You need to upload the required files to ApsaraDB for SelectDB in advance by executing the CREATE FILE statement. The catalog is named kafka.

    1. Upload files. Sample code:

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. Create a Routine Load job. Sample code:

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      Note

      ApsaraDB for SelectDB accesses Kafka clusters by using the Kafka C++ client library librdkafka. For more information about the parameters supported by librdkafka, see the Configuration properties document of librdkafka.

  2. Access a Kafka cluster for which the PLAIN authentication method is enabled.

    To access a Kafka cluster for which the PLAIN authentication method is enabled, you must add the following configurations:

    1. property.security.protocol=SASL_PLAINTEXT: Use the Simple Authentication and Security Layer (SASL) plaintext authentication method.

    2. property.sasl.mechanism=PLAIN: Set the SASL authentication method to PLAIN.

    3. property.sasl.username=admin: Specify the username of SASL.

    4. property.sasl.password=admin: Specify the password of SASL.

    Create a Routine Load job. Sample code:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. Access a Kafka cluster for which the Kerberos authentication method is enabled.

    To access a Kafka cluster for which the Kerberos authentication method is enabled, you must add the following configurations:

    1. security.protocol=SASL_PLAINTEXT: Use the SASL plaintext authentication method.

    2. sasl.kerberos.service.name=$SERVICENAME: Specify the broker service name.

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: Specify the path of the local .keytab file.

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: Specify the Kerberos principal that ApsaraDB for SelectDB uses to connect to the Kafka cluster.

    Create a Routine Load job. Sample code:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    Note
    • To enable ApsaraDB for SelectDB to access a Kafka cluster for which the Kerberos authentication method is enabled, you need to deploy the Kerberos client kinit on all running nodes of the ApsaraDB for SelectDB cluster, configure the krb5.conf file, and specify the Key Distribution Center (KDC) service information.

    • Set the property.sasl.kerberos.keytab parameter to the absolute path of the local .keytab file, and allow ApsaraDB for SelectDB processes to access the local .keytab file.

Modify a Routine Load job

You can modify an existing Routine Load job that is in the PAUSED state.

Syntax

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

Parameters

Parameter

Description

[db.]job_name

The name of the job to be modified.

tbl_name

The name of the table into which data is to be imported.

job_properties

The job parameters to be modified. Only the following parameters can be modified:

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

The type of the data source. Set this parameter to KAFKA.

data_source_properties

The parameters of the data source. Only the following parameters are supported:

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. Custom properties, such as property.group.id.

Note

The kafka_partitions and kafka_offsets parameters are used to modify the offsets of the Kafka partitions to be consumed. You can modify only the partitions that are consumed. You cannot add partitions.

Examples

  • Change the value of the desired_concurrent_number parameter to 1. Sample code:

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • Change the value of the desired_concurrent_number parameter to 10, and modify the partition offset and group ID. Sample code:

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

Pause a Routine Load job

You can pause a Routine Load job by executing the PAUSE statement, and resume the paused job by executing the RESUME statement.

Syntax

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

Parameters

Parameter

Description

[db.]job_name

The name of the job to be paused.

Examples

  • Execute the following statement to pause the Routine Load job named test1:

    PAUSE ROUTINE LOAD FOR test1;
  • Execute the following statement to pause all Routine Load jobs:

    PAUSE ALL ROUTINE LOAD;

Resume a Routine Load job

You can resume a paused Routine Load job. The resumed job continues to consume data from the last consumed offset.

Syntax

RESUME [ALL] ROUTINE LOAD FOR <job_name>

Parameters

Parameter

Description

[db.]job_name

The name of the Routine Load job to be resumed.

Examples

  • Execute the following statement to resume the Routine Load job named test1:

    RESUME ROUTINE LOAD FOR test1;
  • Execute the following statement to resume all Routine Load jobs:

    RESUME ALL ROUTINE LOAD;

Stop a Routine Load job

You can stop a Routine Load job. A stopped Routine Load job cannot be restarted. After the Routine Load job is stopped, the imported data cannot be rolled back.

Syntax

STOP ROUTINE LOAD FOR <job_name>;

Parameters

Parameter

Description

[db.]job_name

The name of the job to be stopped.

Examples

Execute the following statement to stop the Routine Load job named test1:

STOP ROUTINE LOAD FOR test1;

Query one or more Routine Load jobs

You can execute the SHOW ROUTINE LOAD statement to query the status of one or more Routine Load jobs.

Syntax

SHOW [ALL] ROUTINE LOAD [FOR job_name];

Parameters

Parameter

Description

[db.]job_name

The name of the job that you want to query.

Note

If the imported data is in an invalid format, detailed error information is recorded in the value of the ErrorLogUrls parameter. The value of the ErrorLogUrls parameter contains multiple URLs. You can copy one of the URLs to query the error information in a browser.

Examples

  • Execute the following statement to query all Routine Load jobs named test1, including stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.

    SHOW ALL ROUTINE LOAD FOR test1;
  • Execute the following statement to query ongoing Routine Load jobs named test1:

    SHOW ROUTINE LOAD FOR test1;
  • Execute the following statements to query all Routine Load jobs in the example_db database, including the stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • Execute the following statements to query all ongoing Routine Load jobs in the example_db database:

    use example_db;
    SHOW ROUTINE LOAD;
  • Execute the following statement to query ongoing Routine Load jobs named test1 in the example_db database:

    SHOW ROUTINE LOAD FOR example_db.test1;
  • Execute the following statement to query all Routine Load jobs named test1 in the example_db database, including the stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

Related system configurations

Related system configurations affect the use of Routine Load.

  • max_routine_load_task_concurrent_num

    A frontend (FE) parameter. Default value: 5. You can modify the parameter at runtime. This parameter specifies the maximum number of tasks that can be concurrently run at a time for a Routine Load job. We recommend that you use the default value. If the parameter is set to a great value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.

  • max_routine_load_task_num_per_be

    An FE parameter. Default value: 5. You can modify the parameter at runtime. This parameter specifies the maximum number of tasks that can be concurrently run at a time on each BE node. We recommend that you use the default value. If the parameter is set to a great value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.

  • max_routine_load_job_num

    An FE parameter. Default value: 100. You can modify the parameter at runtime. This parameter specifies the maximum number of Routine Load jobs that you can submit, including jobs in the NEED_SCHEDULED, RUNNING, or PAUSED state. If the total number of Routine Load jobs that you submit reaches the maximum value, no more jobs can be submitted.

  • max_consumer_num_per_group

    A BE parameter. Default value: 3. This parameter specifies the maximum number of consumers that can be generated to consume data in a task. For a Kafka data source, a consumer may consume data of one or more Kafka partitions. If a task consumes data of six Kafka partitions, three consumers are generated. Each consumer consumes data of two partitions. If only two partitions exist, only two consumers are generated, and each consumer consumes data of one partition.

  • max_tolerable_backend_down_num

    An FE parameter. Default value: 0. If specific conditions are met, ApsaraDB for SelectDB reschedules the jobs in the PAUSED state. Then, the state of the rescheduled jobs changes to RUNNING. The value 0 indicates that jobs can be rescheduled only if all BE nodes are alive.

  • period_of_auto_resume_min

    An FE parameter. The default value is 5 minutes, which indicates that ApsaraDB for SelectDB reschedules a job up to three times within 5 minutes. If the job fails to be rescheduled three times, the job is locked and no longer rescheduled. However, manual intervention can be performed to resume the job.

Other descriptions

  • Relationships between a Routine Load job and ALTER TABLE operations

    • A Routine Load job does not block SCHEMA CHANGE or ROLLUP operations. However, if columns in the source data cannot match columns in the destination table after a SCHEMA CHANGE operation, the number of error data records increases and the job is eventually paused. To prevent this issue, we recommend that you explicitly specify column mappings in a Routine Load job and use NULLABLE columns or columns with the DEFAULT constraint.

    • If you delete a partition of a table, data may fail to be imported because the partition cannot be found. In this case, the job is paused.

  • Relationships between a Routine Load job and LOAD, DELETE, and INSERT operations

    • A Routine Load job does not conflict with LOAD or INSERT operations.

    • To perform a DELETE operation on a table, you must make sure that no data is being imported into the corresponding table partition. Therefore, before you perform a DELETE operation, you must pause a Routine Load job and wait until all the tasks that have been assigned are complete.

  • Relationships between a Routine Load job and the DROP DATABASE or DROP TABLE operation

    If the database or table into which a Routine Load job is importing data is deleted, the job is automatically canceled.

  • Relationships between a Routine Load job for a Kafka cluster and a Kafka topic

    If the Kafka topic that is declared in the CREATE ROUTINE LOAD statement does not exist in the Kafka cluster, Kafka brokers can automatically create the topic based on the setting of the auto.create.topics.enable parameter.

    • If the auto.create.topics.enable parameter is set to true for a Kafka broker, the topic is automatically created. The number of partitions that are automatically created is determined by the num.partitions parameter of the Kafka broker. The Routine Load job continuously reads the data of the topic.

    • If the auto.create.topics.enable parameter is set to false for a Kafka broker, the topic is not automatically created. In this case, the Routine Load job is paused until data is available.

    Therefore, if you want a topic to be automatically created, set the auto.create.topics.enable parameter to true for brokers in the Kafka cluster.

  • Considerations for CIDR block and domain name resolution isolation in an environment

    • The brokers that are specified when you create a Routine Load job must be accessible to ApsaraDB for SelectDB.

    • If the advertised.listeners parameter is configured in Kafka, the address in the value of the advertised.listeners parameter must be accessible to ApsaraDB for SelectDB.

  • Specify the partitions and offsets for data consumption

    ApsaraDB for SelectDB allows you to specify the partitions, offsets, and points in time for data consumption. The following section describes the parameters.

    • kafka_partitions: the partitions whose data is to be consumed. Example: "0,1,2,3".

    • kafka_offsets: the start offset of each partition. The number of offsets that you specify for this parameter must be the same as the number of partitions that you specify for the kafka_partitions parameter. Example: "1000,1000,2000,2000".

    • property.kafka_default_offset: the default start offset of the partitions.

    When you create a Routine Load job, you can combine the three parameters by using one of the five methods that are described in the following table.

    Method

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    Behavior

    1

    No

    No

    No

    The system automatically searches for all partitions of the Kafka topic and starts to consume data from the end offset of the partitions.

    2

    No

    No

    Yes

    The system automatically searches for all partitions of the Kafka topic and starts to consume data from the default offset.

    3

    Yes

    No

    No

    The system starts to consume data from the end offset of the specified partitions.

    4

    Yes

    Yes

    No

    The system starts to consume data from the specified offsets of the specified partitions.

    5

    Yes

    No

    Yes

    The system starts to consume data from the default offset of the specified partitions.

  • Difference between the STOPPED and PAUSED states

    An FE automatically clears the Routine Load jobs in the STOPPED state on a regular basis. The Routine Load jobs in the PAUSED state can be resumed.