全部產品
Search
文件中心

ApsaraDB for SelectDB:Routine Load

更新時間:Oct 11, 2024

Routine Load支援提交一個常駐的匯入作業,不斷地從指定的資料來源讀取資料,將資料持續地匯入至ApsaraDB for SelectDB中。本文介紹如何通過Routine Load將Kafka中的資料匯入至ApsaraDB for SelectDB執行個體。

前提條件

  • 支援的資料來源:目前僅支援Kafka資料來源,可通過無認證方式或PLAIN/SSL/Kerberos等認證方式串連Kafka。

  • 支援的訊息格式:CSVJSON格式。CSV的格式,每一個Message為一行,且行尾不包含分行符號。

注意事項

預設支援Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka相容舊版本,具體操作有以下兩種方式:

  • 將BE的配置kafka_broker_version_fallback的值設定為要相容的舊版本。

  • 建立Routine Load的時候直接設定property.broker.version.fallback的值為要相容的舊版本。

說明

使用相容舊版本的代價在於,Routine Load的部分新特性可能無法使用,例如根據時間設定Kafka分區的offset。

建立匯入作業

使用Routine Load功能時,首先需建立一個Routine Load作業。該作業將通過例行調度持續發送任務,每個任務會消耗一定數量的Kafka訊息。

文法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

參數說明

參數名稱

參數說明

[db.]job_name

匯入作業的名稱。在同一個Database內,相同名稱的job只能運行一個。

tbl_name

指定匯入的表的名稱。

merge_type

指定資料合併類型。預設為APPEND,表示匯入的資料都是普通的追加寫操作。MERGEDELETE類型僅適用於Unique Key模型表。其中MERGE類型需要配合[DELETE ON]語句使用,以標註Delete Flag列。而DELETE類型則表示匯入的所有資料皆為刪除資料。

load_properties

指定匯入資料處理相關參數。詳細參數說明,請參見load_properties

job_properties

指定匯入作業相關參數。詳細參數說明,請參見job_properties參數說明

data_source_properties

指定資料來源的類型。詳細參數說明,請參見data_source_properties參數說明

load_properties參數說明

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

參數名稱

樣本值

參數說明

column_separator

COLUMNS TERMINATED BY ","

指定資料行分隔符號,預設為\t

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

指定檔案列和表中列的映射關係,以及各種列轉換等。詳細說明請參見資料轉化

preceding_filter

指定過濾未經處理資料條件。詳細說明請參見資料轉化

where_predicates

WHERE k1>100 and k2=1000

指定條件對匯入的資料進行過濾。詳細說明請參見資料轉化

partitions

PARTITION(p1,p2,p3)

指定匯入目的表的哪些Partition中。如果不指定,則會自動匯入到對應的Partition中。

DELETE ON

DELETE ON v3>100

用於指定匯入資料中表示Delete Flag的列和計算關係。

說明

需配合MEREGE匯入模式一起使用,僅適用於Unique Key模型的表。

ORDER BY

用於指定匯入資料中表示Sequence Col的列。其功能為匯入資料時保證資料順序。

說明

僅適用於對Unique Key模型的表。

job_properties參數說明

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
說明

這三個參數max_batch_interval、max_batch_rows和max_batch_size用於控制子任務的執行時間和處理量。當任意一個參數達到設定閾值時,任務將終止。

參數名稱

樣本值

參數說明

desired_concurrent_number

"desired_concurrent_number" = "3"

指定期望並發度。大於0,預設為3。一個例行匯入作業會被分成多個子任務執行。這個參數用於指定一個作業最多有多少任務可以同時執行。

說明
  1. 這個並發度並不是實際的並發度,實際的並發度,會通過叢集的節點數、負載情況,以及資料來源的情況綜合判斷。

  2. 適當提高並發可利用分布式叢集加速,但過高會導致大量小檔案寫入,建議取值為叢集核心數 / 16

max_batch_interval

"max_batch_interval" = "20"

指定每個子任務最大執行時間,單位是秒,預設為10,取值範圍為5~60秒。

max_batch_rows

"max_batch_rows" = "300000"

指定每個子任務最多讀取的行數。預設是200000,取值範圍大於等於200000。

max_batch_size

"max_batch_size" = "209715200"

指定每個子任務最多讀取的位元組數。單位是位元組,預設為104857600,即100 MB。取值範圍為100 MB~1 GB。

max_error_number

"max_error_number"="3"

指定採樣視窗內,允許的最大錯誤行數。預設為0,即不允許有錯誤行。取值範圍大於等於0。

採樣視窗為max_batch_rows*10。即如果在採樣視窗內,錯誤行數大於,則會導致例行作業被暫停,需要人工介入檢查資料品質問題。

說明

where條件過濾掉的行不算錯誤行。

strict_mode

"strict_mode"="true"

指定是否開啟strict 模式,預設為false。開啟後,非空未經處理資料的列類型變換如果結果為NULL,則會被過濾。指定方式為strict 模式時,即對於匯入處理程序中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:

  • 對於列類型轉換來說,如果strict mode為true,則錯誤的資料將被過濾。錯誤資料即未經處理資料並不為空白值,在參與列類型轉換後結果為空白值的這一類資料。

  • 對於匯入的某列由函數變換產生時,strict mode對其不產生影響。

  • 對於匯入的某列類型包含範圍限制的,如果未經處理資料能正常通過類型轉換,但無法通過範圍限制的,strict mode對其也不產生影響。例如:如果類型是decimal(1,0),未經處理資料為10,則屬於可以通過類型轉換但不在列聲明的範圍內。這種資料strict對其不產生影響。

timezone

"timezone" = "Africa/Abidjan"

指定匯入作業所使用的時區。預設為使用Session的時區作為參數。

說明

該參數會影響所有匯入涉及的和時區有關的函數結果。

format

"format" = "json"

指定匯入資料格式,預設為CSV,支援JSON格式。

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

當匯入資料格式為JSON時,可通過jsonpaths指定抽取JSON資料中的欄位。

strip_outer_array

-H "strip_outer_array:true"

當匯入資料格式為JSON時,strip_outer_arraytrue表示JSON資料以數組的形式展現,資料中的每一個元素將被視為一行資料。預設為false

json_root

-H "json_root:$.RECORDS"

當匯入資料格式為JSON時,可以通過json_root指定JSON資料的根節點。SelectDB將通過json_root抽取根節點的元素進行解析。預設為空白。

send_batch_parallelism

指定設定發送批處理資料的並行度,如果並行度的值超過BE配置中的max_send_batch_parallelism_per_job,那麼作為協調點的BE將使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

指定是否只匯入資料到對應分區的一個tablet,預設值為false。該參數只允許向對帶有random分區的Duplicate表匯入資料的時設定。

strict 模式(strict mode)與未經處理資料(source data)的匯入關係

例如列類型為TinyInt。當表中的列允許匯入空值時匯入關係如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa or 2000

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1

1

true or false

correct data

例如列類型為Decimal(1,0)。當表中的列允許匯入空值時匯入關係如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1 or 10

1

true or false

correct data

說明

10雖然是一個超過範圍的值,但是因為其類型符合Decimal的要求,所以strict mode對其不產生影響。10最後會在其他ETL處理流程中被過濾。但不會被strict mode過濾。

data_source_properties參數說明

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

參數名稱

參數說明

kafka_broker_list

指定Kafka的Broker串連資訊。格式為ip:host。多個Broker之間以逗號分隔。

格式:"kafka_broker_list"="broker1:9092,broker2:9092"

kafka_topic

指定訂閱的Kafka的Topic。

格式:"kafka_topic"="my_topic"

kafka_partitions/kafka_offsets

指定訂閱的Kafka Partition,以及對應的每個Partition的起始Offset。如果指定時間,則會從大於等於該時間的最近一個Offset處開始消費。

Offset可以指定從大於等於0的具體Offset,或者:

  • OFFSET_BEGINNING:從有資料的位置開始訂閱。

  • OFFSET_END:從末尾開始訂閱。

  • 時間格式,如:"2021-05-22 11:00:00"

如果沒有指定,則預設從OFFSET_END開始訂閱Topic下的所有Partition。

樣本如下。

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
重要

時間格式不能和OFFSET格式混用。

property

指定自訂Kafka參數。功能等同於Kafka shell中"--property"參數。

當參數的value為一個檔案時,需要在value前加上關鍵詞:"FILE:"。

Property參數說明

  • 使用SSL串連Kafka時,需要指定以下參數:

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    其中property.security.protocolproperty.ssl.ca.location為必選項,用於指明串連方式為SSL,以及CA認證的位置。

    如果Kafka Server端開啟了Client認證,則還需設定以下參數。

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    分別用於指定Client的public key、private key、private key的密碼。

  • 指定Kafka Partition的預設起始offset。

    沒有指定kafka_partitions/kafka_offsets,預設消費所有分區。此時可以通過kafka_default_offsets指定起始offset。預設為OFFSET_END,即從末尾開始訂閱。

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

更多支援的自訂參數,請參閱librdkafka的官方CONFIGURATION文檔中,Client端的配置項。例如以下參數。

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

使用樣本

建立Routine Load簡單作業

  1. 建立待匯入的SelectDB資料表,樣本如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 分別設定不同的參數匯入資料,樣本如下。

    • 為example_db的test_table建立一個名為test1的Kafka Routine Load任務。指定資料行分隔符號的group.id和client.id,設定自動預設消費所有分區,且從有資料的位置(OFFSET_BEGINNING)開始訂閱,樣本如下。

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 為example_db的test_table建立一個名為test2的Kafka Routine Load任務。匯入任務為strict 模式,樣本如下。

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 從指定的時間點開始消費,樣本如下。

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

匯入JSON格式資料

Routine Load匯入的JSON格式僅支援以下兩種。

  • 只有一條記錄,且為JSON對象。

    當使用單表匯入(即通過ON TABLE_NAME指定表名)時,JSON資料格式如下。

    {"key1":"value1","key2":"value2","key3":"value3"}

    當使用動態或多表匯入Routine Load(即不指定具體的表名)時,JSON資料格式如下。

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • JSON數組,數組中可含多條記錄。

    當使用單表匯入(即通過ON TABLE_NAME指定表名)時,JSON資料格式如下。

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    當使用動態或多表匯入(即不指定具體的表名)時,JSON資料格式如下。

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

匯入JSON格式資料,樣本如下。

  1. 建立待匯入的SelectDB資料表,樣本如下。

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. 分別使用兩個類型的JSON資料記錄到Topic裡:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. 以不同模式匯入JSON資料,樣本如下。

    • 以簡單模式匯入JSON資料。

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
       );
    • 精準匯入JSON格式資料。

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json",
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
          "strip_outer_array" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
      );
      說明

      表裡的分區欄位dt在樣本資料裡並沒有,而是在Routine Load語句裡通過dt=from_unixtime(timestamp,'%Y%m%d')轉換出來的。

訪問不同驗證方式的Kafka叢集

根據Kafka叢集驗證方式的不同,訪問的方式樣本如下。

  1. 訪問SSL認證的Kafka叢集。

    訪問SSL認證的Kafka叢集需要您提供用於認證Kafka Broker公開金鑰的認證檔案(ca.pem)。如果Kafka叢集同時開啟了用戶端認證,則還需提供用戶端的公開金鑰(client.pem)、密鑰檔案(client.key),以及密鑰密碼。這裡所需的檔案需要先通過CREATE FILE命令上傳到SelectDB中,並且Catalog名稱為kafka。

    1. 上傳檔案,樣本如下。

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. 建立Routine Load作業,樣本如下。

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      說明

      SelectDB通過Kafka的C++ APIlibrdkafka來訪問Kafka叢集。librdkafka所支援的參數可以參閱Configuration properties

  2. 訪問PLAIN認證的Kafka叢集。

    訪問開啟PLAIN認證的Kafka叢集,需要增加配置如下。

    1. property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. property.sasl.mechanism=PLAIN:設定SASL的認證方式為PLAIN。

    3. property.sasl.username=admin:設定SASL的使用者名稱。

    4. property.sasl.password=admin:設定SASL的密碼。

    建立Routine Load作業,樣本如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. 訪問Kerberos認證的Kafka叢集。

    訪問開啟kerberos認證的Kafka叢集,需要增加配置如下。

    1. security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. sasl.kerberos.service.name=$SERVICENAME:設定broker servicename。

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:設定Keytab本地檔案路徑。

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:設定SelectDB串連Kafka時使用的Kerberos主體。

    建立Routine Load作業,樣本如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    說明
    • 若要使SelectDB訪問開啟kerberos認證方式的Kafka叢集,需要在SelectDB叢集所有運行節點上部署Kerberos用戶端kinit,並配置krb5.conf,填寫KDC服務資訊等。

    • 配置property.sasl.kerberos.keytab的值需要指定Keytab本地檔案的絕對路徑,並允許SelectDB進程訪問該本地檔案。

修改匯入作業

修改已經建立的例行匯入作業。只能修改處於PAUSED狀態的作業。

文法

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

參數說明

參數名稱

參數說明

[db.]job_name

指定要修改的作業名稱。

tbl_name

指定需要匯入的表的名稱。

job_properties

指定需要修改的作業參數。目前僅支援修改的參數如下。

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

指定資料來源的類型。當前支援:KAFKA

data_source_properties

指定資料來源的相關屬性。目前僅支援的屬性如下。

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. 自訂property,如property.group.id

說明

kafka_partitionskafka_offsets用於修改待消費的kafka partition 的offset,僅能修改當前已經消費的partition。不能新增partition。

使用樣本

  • desired_concurrent_number修改為1,樣本如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • desired_concurrent_number修改為10,修改partition的offset,修改group id,樣本如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

暫停匯入作業

暫停一個Routine Load作業。被暫停作業可以通過RESUME命令重新運行。

文法

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

參數說明

參數名稱

參數說明

[db.]job_name

指定要暫停作業名稱。

使用樣本

  • 暫停名稱為test1的Routine Load作業,樣本如下。

    PAUSE ROUTINE LOAD FOR test1;
  • 暫停所有Routine Load作業,樣本如下。

    PAUSE ALL ROUTINE LOAD;

恢複匯入作業

恢複一個被暫停Routine Load作業。恢複的作業將繼續從之前已消費的offset繼續消費。

文法

RESUME [ALL] ROUTINE LOAD FOR <job_name>

參數說明

參數名稱

參數說明

[db.]job_name

指定要恢複的作業名稱。

使用樣本

  • 恢複名稱為test1的Routine Load作業,樣本如下。

    RESUME ROUTINE LOAD FOR test1;
  • 恢複所有Routine Load作業,樣本如下。

    RESUME ALL ROUTINE LOAD;

停止匯入作業

停止一個Routine Load作業。被停止的作業無法再重新運行。停止匯入後,已匯入資料不會復原。

文法

STOP ROUTINE LOAD FOR <job_name>;

參數說明

參數名稱

參數說明

[db.]job_name

指定要停止的作業名稱。

使用樣本

停止名稱為test1的Routine Load作業,樣本如下。

STOP ROUTINE LOAD FOR test1;

查看匯入作業

Routine Load作業運行狀態需要通過SHOW ROUTINE LOAD命令查看。

文法

SHOW [ALL] ROUTINE LOAD [FOR job_name];

參數說明

參數名稱

參數說明

[db.]job_name

指定要查看的作業名稱。

說明

如果匯入資料格式不合法,詳細的錯誤資訊會記錄在ErrorLogUrls中。注意其中包含多個連結,拷貝其中任意一個連結在瀏覽器中查看即可。

使用樣本

  • 展示名稱為test1的所有Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。

    SHOW ALL ROUTINE LOAD FOR test1;
  • 展示名稱為test1的當前正在啟動並執行Routine Load作業。

    SHOW ROUTINE LOAD FOR test1;
  • 顯示example_db下,所有的Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • 顯示example_db下,所有正在啟動並執行Routine Load作業。

    use example_db;
    SHOW ROUTINE LOAD;
  • 顯示example_db下,名稱為test1的當前正在啟動並執行Routine Load作業。

    SHOW ROUTINE LOAD FOR example_db.test1;
  • 顯示example_db下,名稱為test1的所有Routine Load作業(包括已停止或取消的作業)。結果為一行或多行。

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

相關係統配置

相關係統配置參數會影響Routine Load的使用。

  • max_routine_load_task_concurrent_num

    FE配置項,預設為5,可以運行時修改。該參數限制了一個例行匯入作業最大的子任務並發數。建議維持預設值。設定過大,可能導致同時並發的任務數過多,佔用叢集資源。

  • max_routine_load_task_num_per_be

    FE配置項,預設為5,可以運行時修改。該參數限制了每個BE節點最多並發執行的子任務個數。建議維持預設值。如果設定過大,可能導致並發任務數過多,佔用叢集資源。

  • max_routine_load_job_num

    FE配置項,預設為100,可以運行時修改。該參數限制Routine Load作業的總數,包括NEED_SCHEDULED,RUNNING,PAUSE這些狀態。超過後,不能再提交新的作業。

  • max_consumer_num_per_group

    BE 配置項,預設為3。該參數表示一個子任務中最多產生幾個Consumer進行資料消費。對於Kafka資料來源,一個Consumer可能消費一個或多個Kafka Partition。假設一個任務需要消費6個Kafka Partition,則會產生3個Consumer,每個Consumer消費2個Partition。如果只有2個Partition,則只會產生2個Consumer,每個Consumer消費1個Partition。

  • max_tolerable_backend_down_num

    FE配置項,預設值是0。在滿足某些條件下,SelectDB可令PAUSED的任務重新調度,變成RUNNING。該參數為0代表只有所有BE節點是alive狀態才允許重新調度。

  • period_of_auto_resume_min

    FE配置項,預設是5分鐘。該項意味著當SelectDB重新調度任務時,只會在5分鐘這個周期內最多嘗試3次。如果3次都失敗則鎖定當前任務,後續不再進行調度,但可通過人為幹預,進行手動恢複。

其他說明

  • Routine Load作業和ALTER TABLE操作的關係。

    • Routine Load不會阻塞Schema變更和ROLLUP操作。但Schema變更完成後,列映射關係無法匹配,則會導致作業的錯誤資料激增,最終導致作業暫停。建議通過在Routine Load作業中顯式指定列映射關係,並且通過增加Nullable列或帶Default值的列來減少這些問題。

    • 刪除表的Partition可能會導致匯入資料無法找到對應的Partition,導致作業進入暫停狀態。

  • Routine Load作業和其他匯入作業的關係(LOAD,DELETE,INSERT)。

    • Routine Load和其他LOAD作業以及INSERT操作沒有衝突。

    • 當執行DELETE操作時,對應表分區不能有任何正在執行的匯入任務。因此在執行DELETE操作前,需要暫停Routine Load作業,並等待已下發的任務全部完成後,方可以執行DELETE。

  • Routine Load作業和DROP DATABASE或DROP TABLE操作的關係。

    當Routine Load對應的database或table被刪除後,作業會自動CANCEL。

  • Kafka類型的Routine Load作業和Kafka topic的關係。

    當建立例行匯入聲明的kafka_topic在Kafka叢集中不存在時:

    • Kafka叢集的Broker設定了auto.create.topics.enable=true,則topic會先被自動建立,自動建立的Partition個數是由您的Kafka叢集中的Broker配置num.partitions決定的。例行作業會不斷讀取該topic的資料。

    • Kafka叢集的Broker設定了auto.create.topics.enable=false,則topic不會被自動建立,例行作業會在沒有讀取任何資料之前就被暫停,狀態置為PAUSED。

    所以,如果您希望當Kafka topic不存在的時候,被例行作業自動建立的話,只需要將Kafka叢集中的Broker設定auto.create.topics.enable=true即可。

  • 當環境中存在網段和網域名稱解析的隔離措施,因此需要注意以下問題。

    • 建立Routine Load任務中指定的Broker list必須能夠被SelectDB服務訪問。

    • Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必須能夠被SelectDB服務訪問。

  • 指定消費的Partition和Offset。

    SelectDB支援指定Partition,Offset和時間點進行消費。這裡說明下對應參數的配置關係。

    三個相關參數如下。

    • kafka_partitions:指定待消費的Partition列表,如:"0,1,2,3"。

    • kafka_offsets:指定每個分區的起始offset,必須和kafka_partitions列表個數對應。如:"1000,1000,2000,2000"

    • property.kafka_default_offset:指定分區預設的起始Offset。

    在建立匯入作業時,這三個參數可以有以下五種組合方式。

    方式

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    行為

    1

    No

    No

    No

    系統會自動尋找Topic對應的所有分區並從OFFSET_END開始消費。

    2

    No

    No

    Yes

    系統會自動尋找Topic對應的所有分區並從default offset指定的位置開始消費。

    3

    Yes

    No

    No

    系統會從指定分區的OFFSET_END開始消費。

    4

    Yes

    Yes

    No

    系統會從指定分區的指定Offset處開始消費。

    5

    Yes

    No

    Yes

    系統會從指定分區,default Offset指定的位置開始消費。

  • STOP和PAUSE的區別。

    FE會自動定期清理STOP狀態的Routine Load,而PAUSE狀態的則可以再次被恢複啟用。