Routine Load allows you to submit a resident import job to continuously read and import data from a specific data source into an ApsaraDB for SelectDB instance. This topic describes how to use Routine Load to import data from a Kafka data source into an ApsaraDB for SelectDB instance.
Prerequisites
The data source must be a Kafka data source. A Routine Load job allows you to access Kafka clusters without authentication or Kafka clusters that support PLAIN, SSL, or Kerberos authentication.
Messages must be in the
CSVorJSONformat. In the CSV format, each message is displayed as one line without a line feed at the end of the line.
Usage notes
By default, Kafka 0.10.0.0 and later are supported. If you want to use Kafka of a version that is earlier than 0.10.0.0, such as 0.9.0, 0.8.2, 0.8.1, or 0.8.0, use one of the following methods:
You can set the value of the
kafka_broker_version_fallbackparameter in the configurations of backends (BEs) to an earlier version of Kafka that you want to use.You can also set the value of the
property.broker.version.fallbackparameter to an earlier version of Kafka when you create a Routine Load job.
If you use Kafka of a version that is earlier than 0.10.0.0, some features of Routine Load may be unavailable. For example, you cannot set a time-based offset for a Kafka partition.
Create a Routine Load job
To use Routine Load, you must create a Routine Load job. The Routine Load job continuously schedules tasks based on routine scheduling. Each task consumes a specific number of Kafka messages.
Syntax
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]Parameters
Parameter | Description |
| The name of the Routine Load job. In a database, if multiple jobs have the same name, only one of the jobs can be run at a time. |
| The name of the destination table into which data is to be imported. |
| The data merging mode of the import. Default value: |
| The parameters that are used to process the imported data. For more information, see the Parameters of load_properties section of this topic. |
| The parameters related to the Routine Load job. For more information, see the Parameters of job_properties section of this topic. |
| The type of the data source. For more information, see the Parameters of data_source_properties section of this topic. |
Parameters of load_properties
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]Parameter | Example | Description |
| COLUMNS TERMINATED BY "," | The column delimiter. Default value: |
| (k1,k2,tmpk1,k3=tmpk1+1) | The mappings between columns in the imported file and columns in the destination table and various column conversion operations. For more information, see Converting Source Data. |
| N/A | The conditions for filtering the source data. For more information, see Converting Source Data. |
| WHERE k1>100 and k2=1000 | The conditions for filtering the imported data. For more information, see Converting Source Data. |
| PARTITION(p1,p2,p3) | The partitions into which data is imported in the destination table. If you do not specify the partitions, the source data is automatically imported into the corresponding partition. |
| DELETE ON v3>100 | The statement that is used to specify the Delete Flag column in the imported data and the calculation relationship. Note This parameter is required if the merge_type parameter is set to MERGE. This parameter is valid only for tables that use the Unique Key model. |
| N/A | The statement that is used to specify the Sequence Col column in the imported data. This parameter is used to maintain the correct order of data during the import. Note This parameter is valid only for tables that use the Unique Key model. |
Parameters of job_properties
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)A Routine Load job is divided into multiple tasks. The max_batch_interval parameter specifies the maximum execution duration of a task. The max_batch_rows parameter specifies the maximum number of rows that can be read by a task. The max_batch_size parameter specifies the maximum number of bytes that can be read by a task. If one of the thresholds specified by the three parameters is reached, the task ends.
Parameter | Example | Description |
| "desired_concurrent_number" = "3" | The maximum number of tasks that can be concurrently run. The value must be an integer that is greater than 0. Default value: Note
|
| "max_batch_interval" = "20" | The maximum execution duration of each task. Unit: seconds. Default value: |
| "max_batch_rows" = "300000" | The maximum number of rows that can be read by each task. Default value: |
| "max_batch_size" = "209715200" | The maximum number of bytes that can be read by each task. Unit: bytes. Default value: |
| "max_error_number"="3" | The maximum number of error rows that are allowed in the sampling window. Default value: The sampling window is ten times the value of the Note The rows that are filtered out by the |
| "strict_mode"="true" | Specifies whether to enable the strict mode. Default value:
|
| "timezone" = "Africa/Abidjan" | The time zone that is used for the Routine Load job. By default, the time zone of the sessions is used. Note This parameter affects the results of all time zone-related functions that are involved in the Routine Load job. |
| "format" = "json" | The format of the imported data. Default value: |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | The fields to be extracted from the |
| -H "strip_outer_array:true" | Specifies whether to display the JSON data as an array if the imported data is in the |
| -H "json_root:$.RECORDS" | The root node of the JSON data if the imported data is in the JSON format. |
| N/A | The maximum number of concurrent jobs to send data for batch processing. If the value of this parameter is greater than the value of the |
| N/A | Specifies whether to import data into only one tablet of a partition. Default value: false. This parameter is available only if data is imported into a table that uses the Duplicate Key model and contains random partitions. |
Relationships between the strict mode and the source data to be imported
In this example, a column of the TINYLNT type is to be imported. The following table describes the relationships between the strict mode and the source data if the system allows NULL column values to be imported.
Source data | Example | STRING to INT | Strict mode | Result |
NULL | \N | N/A | true or false | NULL |
NOT NULL | aaa or 2000 | NULL | true | Invalid data (filtered) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 | 1 | true or false | Correct data |
In this example, a column of the DECIMAL(1,0) type is to be imported. The following table describes the relationships between the strict mode and the source data if the system allows NULL column values to be imported.
Source data | Example | STRING to INT | Strict mode | Result |
NULL | \N | N/A | true or false | NULL |
NOT NULL | aaa | NULL | true | Invalid data (filtered) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 or 10 | 1 | true or false | Correct data |
The value 10 exceeds the allowed range of the DECIMAL(1,0) type. However, the value 10 is not filtered out in strict mode because the value 10 meets the requirements of the DECIMAL type. The value 10 is eventually filtered out in an extract, transform, and load (ETL) process.
Parameters of data_source_properties
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)Parameter | Description |
| The configurations that are used to connect to brokers in the Kafka cluster. Format: Example: |
| The Kafka topic to which you want to subscribe. Format: |
| The Kafka partitions to which you want to subscribe and the start offset of each partition. If you specify a point in time, data consumption starts from the latest offset that is greater than or equal to this point in time. You can specify an offset that is greater than or equal to 0. Alternatively, set the kafka_offsets parameter to one of the following values:
If you do not specify this parameter, the system subscribes to all partitions in the topic from the Examples: Important The time format cannot be mixed with the offset format. |
| The custom Kafka parameters. This parameter is equivalent to the --property parameter in the Kafka shell. If the value of this parameter is a file, you need to add the keyword |
Property parameters
If you connect to a Kafka cluster by using the SSL authentication method, you must configure the following parameters:
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"The
property.security.protocolandproperty.ssl.ca.locationparameters are required to specify the method that is used to connect to the Kafka cluster and the location of the certificate authority (CA) certificate.If the client authentication mode is enabled for the Kafka server, you must configure the following parameters:
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"The preceding parameters are used to specify the location of the public key certificate of the client, the location of the private key file of the client, and the password for accessing the private key of the client.
Specify the default start offset of a Kafka partition.
By default, if the
kafka_partitions/kafka_offsetsparameter is not specified, data of all partitions is consumed. In this case, you can configure thekafka_default_offsetsparameter to specify the start offset of each partition. Default value:OFFSET_END, which indicates that partitions are subscribed to from the end offset."property.kafka_default_offsets" = "OFFSET_BEGINNING"
For more information about supported custom parameters, see the client configuration items in the official configuration document of librdkafka. For example, the following custom parameters are available:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"Examples
Create a Routine Load job
Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");Configure the parameters in the following sample code to import data.
Create a Routine Load job named test1 for the test_table table in the example_db database. Specify the group ID, client ID, and column delimiter, enable the system to automatically consume data of all partitions by default, and subscribe to the partitions from the start offset at which data is available. Sample code:
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Create a Routine Load job named test2 for the test_table table in the example_db database. Enable the strict mode for the job. Sample code:
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Consume data of partitions from the specified point in time. Sample code:
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
Import JSON data
You can use Routine Load to import JSON data of the following two types:
The JSON data contains only one data record and is a JSON object.
If you use the single-table import mode, the JSON data is in the following format. In this mode, the table name is specified by executing the ON TABLE_NAME statement.
{"key1":"value1","key2":"value2","key3":"value3"}If you use the dynamic or multi-table import mode, the JSON data is in the following format. In this mode, table names are not specified.
table_name|{"key1":"value1","key2":"value2","key3":"value3"}The JSON data is an array that contains multiple data records.
If you use the single-table import mode, the JSON data is in the following format. In this mode, the table name is specified by executing the ON TABLE_NAME statement.
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]If you use the dynamic or multi-table import mode, the JSON data is in the following format. In this mode, table names are not specified.
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
Import JSON data.
Create a table into which you want to import data in an ApsaraDB for SelectDB instance. Sample code:
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;Import JSON data of the preceding two types into a topic. Sample code:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]Import JSON data in different modes.
Import JSON data in simple mode. Sample code:
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );Accurately import JSON data. Sample code:
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );NoteThe partition field
dtin the table is unavailable in the sample data. The values of the dt field are generated by using the configurationdt=from_unixtime(timestamp,'%Y%m%d')in the CREATE ROUTINE LOAD statement.
Access Kafka clusters that use different authentication methods
The following examples show how to access Kafka clusters based on the authentication methods of the Kafka clusters.
Access a Kafka cluster for which the SSL authentication method is enabled.
To access a Kafka cluster for which the SSL authentication method is enabled, you must provide the certificate file (ca.pem) that is used to authenticate the public keys of Kafka brokers. If the client authentication mode is enabled for the Kafka cluster, the public key certificate (client.pem), private key file (client.key), and private key password of the client are also required. You need to upload the required files to ApsaraDB for SelectDB in advance by executing the
CREATE FILEstatement. The catalog is named kafka.Upload files. Sample code:
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");Create a Routine Load job. Sample code:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );NoteApsaraDB for SelectDB accesses Kafka clusters by using the Kafka C++ client library
librdkafka. For more information about the parameters supported by librdkafka, see the Configuration properties document oflibrdkafka.
Access a Kafka cluster for which the PLAIN authentication method is enabled.
To access a Kafka cluster for which the PLAIN authentication method is enabled, you must add the following configurations:
property.security.protocol=SASL_PLAINTEXT: Use the Simple Authentication and Security Layer (SASL) plaintext authentication method.
property.sasl.mechanism=PLAIN: Set the SASL authentication method to PLAIN.
property.sasl.username=admin: Specify the username of SASL.
property.sasl.password=admin: Specify the password of SASL.
Create a Routine Load job. Sample code:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );Access a Kafka cluster for which the Kerberos authentication method is enabled.
To access a Kafka cluster for which the Kerberos authentication method is enabled, you must add the following configurations:
security.protocol=SASL_PLAINTEXT: Use the SASL plaintext authentication method.
sasl.kerberos.service.name=$SERVICENAME: Specify the broker service name.
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: Specify the path of the local .keytab file.
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: Specify the Kerberos principal that ApsaraDB for SelectDB uses to connect to the Kafka cluster.
Create a Routine Load job. Sample code:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );NoteTo enable ApsaraDB for SelectDB to access a Kafka cluster for which the Kerberos authentication method is enabled, you need to deploy the Kerberos client kinit on all running nodes of the ApsaraDB for SelectDB cluster, configure the krb5.conf file, and specify the Key Distribution Center (KDC) service information.
Set the
property.sasl.kerberos.keytabparameter to the absolute path of the local .keytab file, and allow ApsaraDB for SelectDB processes to access the local .keytab file.
Modify a Routine Load job
You can modify an existing Routine Load job that is in the PAUSED state.
Syntax
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]Parameters
Parameter | Description |
[db.]job_name | The name of the job to be modified. |
tbl_name | The name of the table into which data is to be imported. |
job_properties | The job parameters to be modified. Only the following parameters can be modified:
|
data_source | The type of the data source. Set this parameter to |
data_source_properties | The parameters of the data source. Only the following parameters are supported:
Note The |
Examples
Change the value of the
desired_concurrent_numberparameter to 1. Sample code:ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );Change the value of the
desired_concurrent_numberparameter to 10, and modify the partition offset and group ID. Sample code:ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
Pause a Routine Load job
You can pause a Routine Load job by executing the PAUSE statement, and resume the paused job by executing the RESUME statement.
Syntax
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;Parameters
Parameter | Description |
[db.]job_name | The name of the job to be paused. |
Examples
Execute the following statement to pause the Routine Load job named test1:
PAUSE ROUTINE LOAD FOR test1;Execute the following statement to pause all Routine Load jobs:
PAUSE ALL ROUTINE LOAD;
Resume a Routine Load job
You can resume a paused Routine Load job. The resumed job continues to consume data from the last consumed offset.
Syntax
RESUME [ALL] ROUTINE LOAD FOR <job_name>Parameters
Parameter | Description |
[db.]job_name | The name of the Routine Load job to be resumed. |
Examples
Execute the following statement to resume the Routine Load job named test1:
RESUME ROUTINE LOAD FOR test1;Execute the following statement to resume all Routine Load jobs:
RESUME ALL ROUTINE LOAD;
Stop a Routine Load job
You can stop a Routine Load job. A stopped Routine Load job cannot be restarted. After the Routine Load job is stopped, the imported data cannot be rolled back.
Syntax
STOP ROUTINE LOAD FOR <job_name>;Parameters
Parameter | Description |
[db.]job_name | The name of the job to be stopped. |
Examples
Execute the following statement to stop the Routine Load job named test1:
STOP ROUTINE LOAD FOR test1;Query one or more Routine Load jobs
You can execute the SHOW ROUTINE LOAD statement to query the status of one or more Routine Load jobs.
Syntax
SHOW [ALL] ROUTINE LOAD [FOR job_name];Parameters
Parameter | Description |
[db.]job_name | The name of the job that you want to query. |
If the imported data is in an invalid format, detailed error information is recorded in the value of the ErrorLogUrls parameter. The value of the ErrorLogUrls parameter contains multiple URLs. You can copy one of the URLs to query the error information in a browser.
Examples
Execute the following statement to query all Routine Load jobs named test1, including stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.
SHOW ALL ROUTINE LOAD FOR test1;Execute the following statement to query ongoing Routine Load jobs named test1:
SHOW ROUTINE LOAD FOR test1;Execute the following statements to query all Routine Load jobs in the example_db database, including the stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.
use example_db; SHOW ALL ROUTINE LOAD;Execute the following statements to query all ongoing Routine Load jobs in the example_db database:
use example_db; SHOW ROUTINE LOAD;Execute the following statement to query ongoing Routine Load jobs named test1 in the example_db database:
SHOW ROUTINE LOAD FOR example_db.test1;Execute the following statement to query all Routine Load jobs named test1 in the example_db database, including the stopped and canceled jobs. The result output displays each job on a separate row and may consist of one or more rows depending on the number of jobs.
SHOW ALL ROUTINE LOAD FOR example_db.test1;
Related system configurations
Related system configurations affect the use of Routine Load.
max_routine_load_task_concurrent_numA frontend (FE) parameter. Default value: 5. You can modify the parameter at runtime. This parameter specifies the maximum number of tasks that can be concurrently run at a time for a Routine Load job. We recommend that you use the default value. If the parameter is set to a great value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.
max_routine_load_task_num_per_beAn FE parameter. Default value: 5. You can modify the parameter at runtime. This parameter specifies the maximum number of tasks that can be concurrently run at a time on each BE node. We recommend that you use the default value. If the parameter is set to a great value, the number of concurrent tasks may be excessive and occupy a large amount of cluster resources.
max_routine_load_job_numAn FE parameter. Default value: 100. You can modify the parameter at runtime. This parameter specifies the maximum number of Routine Load jobs that you can submit, including jobs in the NEED_SCHEDULED, RUNNING, or PAUSED state. If the total number of Routine Load jobs that you submit reaches the maximum value, no more jobs can be submitted.
max_consumer_num_per_groupA BE parameter. Default value: 3. This parameter specifies the maximum number of consumers that can be generated to consume data in a task. For a Kafka data source, a consumer may consume data of one or more Kafka partitions. If a task consumes data of six Kafka partitions, three consumers are generated. Each consumer consumes data of two partitions. If only two partitions exist, only two consumers are generated, and each consumer consumes data of one partition.
max_tolerable_backend_down_numAn FE parameter. Default value: 0. If specific conditions are met, ApsaraDB for SelectDB reschedules the jobs in the PAUSED state. Then, the state of the rescheduled jobs changes to RUNNING. The value 0 indicates that jobs can be rescheduled only if all BE nodes are alive.
period_of_auto_resume_minAn FE parameter. The default value is 5 minutes, which indicates that ApsaraDB for SelectDB reschedules a job up to three times within 5 minutes. If the job fails to be rescheduled three times, the job is locked and no longer rescheduled. However, manual intervention can be performed to resume the job.
Other descriptions
Relationships between a Routine Load job and ALTER TABLE operations
A Routine Load job does not block SCHEMA CHANGE or ROLLUP operations. However, if columns in the source data cannot match columns in the destination table after a SCHEMA CHANGE operation, the number of error data records increases and the job is eventually paused. To prevent this issue, we recommend that you explicitly specify column mappings in a Routine Load job and use NULLABLE columns or columns with the DEFAULT constraint.
If you delete a partition of a table, data may fail to be imported because the partition cannot be found. In this case, the job is paused.
Relationships between a Routine Load job and LOAD, DELETE, and INSERT operations
A Routine Load job does not conflict with LOAD or INSERT operations.
To perform a DELETE operation on a table, you must make sure that no data is being imported into the corresponding table partition. Therefore, before you perform a DELETE operation, you must pause a Routine Load job and wait until all the tasks that have been assigned are complete.
Relationships between a Routine Load job and the DROP DATABASE or DROP TABLE operation
If the database or table into which a Routine Load job is importing data is deleted, the job is automatically canceled.
Relationships between a Routine Load job for a Kafka cluster and a Kafka topic
If the
Kafka topicthat is declared in the CREATE ROUTINE LOAD statement does not exist in the Kafka cluster, Kafka brokers can automatically create the topic based on the setting of the auto.create.topics.enable parameter.If the
auto.create.topics.enableparameter is set to true for a Kafka broker, the topic is automatically created. The number of partitions that are automatically created is determined by the num.partitions parameter of the Kafka broker. The Routine Load job continuously reads the data of the topic.If the
auto.create.topics.enableparameter is set to false for a Kafka broker, the topic is not automatically created. In this case, the Routine Load job is paused until data is available.
Therefore, if you want a topic to be automatically created, set the
auto.create.topics.enableparameter to true for brokers in the Kafka cluster.Considerations for CIDR block and domain name resolution isolation in an environment
The brokers that are specified when you create a Routine Load job must be accessible to ApsaraDB for SelectDB.
If the
advertised.listenersparameter is configured in Kafka, the address in the value of theadvertised.listenersparameter must be accessible to ApsaraDB for SelectDB.
Specify the partitions and offsets for data consumption
ApsaraDB for SelectDB allows you to specify the partitions, offsets, and points in time for data consumption. The following section describes the parameters.
kafka_partitions: the partitions whose data is to be consumed. Example: "0,1,2,3".kafka_offsets: the start offset of each partition. The number of offsets that you specify for this parameter must be the same as the number of partitions that you specify for thekafka_partitionsparameter. Example: "1000,1000,2000,2000".property.kafka_default_offset: the default start offset of the partitions.
When you create a Routine Load job, you can combine the three parameters by using one of the five methods that are described in the following table.
Method
kafka_partitions
kafka_offsets
property.kafka_default_offset
Behavior
1
No
No
No
The system automatically searches for all partitions of the Kafka topic and starts to consume data from the end offset of the partitions.
2
No
No
Yes
The system automatically searches for all partitions of the Kafka topic and starts to consume data from the default offset.
3
Yes
No
No
The system starts to consume data from the end offset of the specified partitions.
4
Yes
Yes
No
The system starts to consume data from the specified offsets of the specified partitions.
5
Yes
No
Yes
The system starts to consume data from the default offset of the specified partitions.
Difference between the STOPPED and PAUSED states
An FE automatically clears the Routine Load jobs in the STOPPED state on a regular basis. The Routine Load jobs in the PAUSED state can be resumed.