ルーチンロードを使用すると、常駐インポートジョブを送信して、特定のデータソースから ApsaraDB for SelectDB インスタンスにデータを継続的に読み込んでインポートできます。このトピックでは、ルーチンロードを使用して Kafka データソースから ApsaraDB for SelectDB インスタンスにデータをインポートする方法について説明します。
前提条件
データソースは Kafka データソースである必要があります。ルーチンロードジョブでは、認証なしの Kafka クラスタ、または PLAIN、SSL、または Kerberos 認証をサポートする Kafka クラスタにアクセスできます。
メッセージは
CSVまたはJSON形式である必要があります。CSV 形式では、各メッセージは行末に改行なしで 1 行として表示されます。
使用上の注意
デフォルトでは、Kafka 0.10.0.0 以降がサポートされています。 0.9.0、0.8.2、0.8.1、0.8.0 などの 0.10.0.0 より前のバージョンの Kafka を使用する場合、次のいずれかの方法を使用します。
バックエンド(BE)の構成で
kafka_broker_version_fallbackパラメータの値を使用する以前のバージョンの Kafka に設定できます。ルーチンロードジョブの作成時に、
property.broker.version.fallbackパラメータの値を以前のバージョンの Kafka に設定することもできます。
0.10.0.0 より前のバージョンの Kafka を使用する場合、ルーチンロードの一部の機能が使用できない場合があります。たとえば、Kafka パーティションに時間ベースのオフセットを設定することはできません。
ルーチンロードジョブの作成
ルーチンロードを使用するには、ルーチンロードジョブを作成する必要があります。ルーチンロードジョブは、ルーチン スケジューリングに基づいてタスクを継続的にスケジュールします。各タスクは、特定の数の Kafka メッセージを消費します。
構文
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]パラメータ
パラメータ | 説明 |
| ルーチンロードジョブの名前。データベース内で複数のジョブが同じ名前の場合、一度に実行できるジョブは 1 つだけです。 |
| データをインポートする宛先テーブルの名前。 |
| インポートのデータマージモード。デフォルト値: |
| インポートされたデータを処理するために使用されるパラメータ。詳細については、このトピックの load_properties のパラメータ セクションを参照してください。 |
| ルーチンロードジョブに関連するパラメータ。詳細については、このトピックの job_properties のパラメータ セクションを参照してください。 |
| データソースのタイプ。詳細については、このトピックの data_source_properties のパラメータ セクションを参照してください。 |
load_properties のパラメータ
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]パラメータ | 例 | 説明 |
| COLUMNS TERMINATED BY "," | 列の区切り文字。デフォルト値: |
| (k1,k2,tmpk1,k3=tmpk1+1) | インポートされたファイルの列と宛先テーブルの列の間のマッピング、およびさまざまな列変換操作。詳細については、ソースデータの変換 を参照してください。 |
| 該当なし | ソースデータをフィルタリングするための条件。詳細については、ソースデータの変換 を参照してください。 |
| WHERE k1>100 and k2=1000 | インポートされたデータをフィルタリングするための条件。詳細については、ソースデータの変換 を参照してください。 |
| PARTITION(p1,p2,p3) | 宛先テーブルでデータをインポートするパーティション。パーティションを指定しない場合、ソースデータは対応するパーティションに自動的にインポートされます。 |
| DELETE ON v3>100 | インポートされたデータの削除フラグ列と計算関係を指定するために使用されるステートメント。 説明 このパラメータは、merge_type パラメータが MERGE に設定されている場合に必須です。このパラメータは、一意キーモデルを使用するテーブルに対してのみ有効です。 |
| 該当なし | インポートされたデータのシーケンス列を指定するために使用されるステートメント。このパラメータは、インポート中に正しいデータの順序を維持するために使用されます。 説明 このパラメータは、一意キーモデルを使用するテーブルに対してのみ有効です。 |
ジョブプロパティのパラメーター
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)ルーチンロードジョブは複数のタスクに分割されます。 max_batch_interval パラメータは、タスクの最大実行時間を指定します。 max_batch_rows パラメータは、タスクが読み取ることができる最大行数を指定します。 max_batch_size パラメータは、タスクが読み取ることができる最大バイト数を指定します。 3 つのパラメータで指定されたしきい値のいずれかに達すると、タスクは終了します。
パラメータ | 例 | 説明 |
| "desired_concurrent_number" = "3" | 同時に実行できるタスクの最大数。値は 0 より大きい整数である必要があります。デフォルト値: 説明
|
| "max_batch_interval" = "20" | 各タスクの最大実行時間。単位: 秒。デフォルト値: |
| "max_batch_rows" = "300000" | 各タスクが読み取ることができる最大行数。デフォルト値: |
| "max_batch_size" = "209715200" | 各タスクが読み取ることができる最大バイト数。単位: バイト。デフォルト値: |
| "max_error_number"="3" | サンプリングウィンドウで許容されるエラー行の最大数。デフォルト値: サンプリングウィンドウは、 説明
|
| "strict_mode"="true" | 厳密モードを有効にするかどうかを指定します。デフォルト値:
|
| "timezone" = "Africa/Abidjan" | ルーチンロードジョブに使用されるタイムゾーン。デフォルトでは、セッションのタイムゾーンが使用されます。 説明 このパラメータは、ルーチンロードジョブに含まれるすべてのタイムゾーン関連関数の結果に影響します。 |
| "format" = "json" | インポートされたデータの形式。デフォルト値: |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | インポートされたデータが |
| -H "strip_outer_array:true" | インポートされたデータが |
| -H "json_root:$.RECORDS" | インポートされたデータが JSON 形式の場合、JSON データのルートノード。 |
| 該当なし | バッチ処理のためにデータを送信する同時ジョブの最大数。このパラメータの値が BE 構成の |
| 該当なし | パーティションの 1 つのタブレットのみにデータをインポートするかどうかを指定します。デフォルト値: false。このパラメータは、重複キーモデルを使用し、ランダムパーティションを含むテーブルにデータをインポートする場合にのみ使用できます。 |
厳密モードとインポートされるソースデータの関係
この例では、TINYLNT 型の列をインポートします。次の表は、システムが NULL 列値のインポートを許可する場合の厳密モードとソースデータの関係を示しています。
ソースデータ | 例 | STRING から INT へ | 厳密モード | 結果 |
NULL | \N | 該当なし | true または false | NULL |
NOT NULL | aaa または 2000 | NULL | true | 無効なデータ(フィルタリング済み) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 | 1 | true または false | 正しいデータ |
この例では、DECIMAL(1,0) 型の列をインポートします。次の表は、システムが NULL 列値のインポートを許可する場合の厳密モードとソースデータの関係を示しています。
ソースデータ | 例 | STRING から INT へ | 厳密モード | 結果 |
NULL | \N | 該当なし | true または false | NULL |
NOT NULL | aaa | NULL | true | 無効なデータ(フィルタリング済み) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 または 10 | 1 | true または false | 正しいデータ |
値 10 は、DECIMAL(1,0) 型の許容範囲を超えています。ただし、値 10 は DECIMAL 型の要件を満たしているため、厳密モードでは除外されません。値 10 は最終的に ETL(抽出、変換、ロード)プロセスで除外されます。
data_source_properties のパラメーター
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)パラメータ | 説明 |
| Kafkaクラスター内のブローカーへの接続に使用する構成。形式: 例: |
| サブスクライブする Kafka トピックです。 形式: |
| サブスクライブする Kafka パーティションと各パーティションの開始オフセット。特定の時点を指定した場合、データ消費はその時点以降の最新のオフセットから開始されます。 0 以上のオフセットを指定できます。または、kafka_offsets パラメーターを次のいずれかの値に設定します。
このパラメーターを指定しない場合、システムは 例: 重要 時刻形式とオフセット形式を混在させることはできません。 |
| カスタム Kafka パラメーター。このパラメーターは、Kafka シェルの --property パラメーターと同等です。 このパラメーターの値がファイルの場合、値の前にキーワード |
プロパティ パラメーター
SSL認証方式を使用してKafkaクラスターに接続する場合は、次のパラメーターを構成する必要があります。
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"property.security.protocolパラメーターとproperty.ssl.ca.locationパラメーターは、Kafkaクラスターへの接続に使用される方式と認証局(CA)証明書の場所を指定するために必要です。Kafkaサーバーでクライアント認証モードが有効になっている場合は、次のパラメーターを構成する必要があります。
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"上記のパラメーターは、クライアントの公開鍵証明書の場所、クライアントの秘密鍵ファイルの場所、およびクライアントの秘密鍵にアクセスするためのパスワードを指定するために使用されます。
Kafkaパーティションのデフォルトの開始オフセットを指定します。
デフォルトでは、
kafka_partitions/kafka_offsetsパラメーターが指定されていない場合、すべてのパーティションのデータが消費されます。この場合、kafka_default_offsetsパラメーターを構成して、各パーティションの開始オフセットを指定できます。デフォルト値:OFFSET_END。これは、パーティションが終了オフセットからサブスクライブされることを示します。"property.kafka_default_offsets" = "OFFSET_BEGINNING" // 開始オフセットを指定
サポートされているカスタムパラメーターの詳細については、「librdkafka の公式構成ドキュメントのクライアント構成項目」をご参照ください。たとえば、次のカスタムパラメーターを使用できます。
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"例
ルーチンロードジョブの作成
ApsaraDB for SelectDBインスタンスにデータを読み込むテーブルを作成します。サンプルコード:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");次のサンプルコードのパラメーターを構成して、データを読み込みます。
example_dbデータベースのtest_tableテーブルに対して、test1という名前のRoutine Loadジョブを作成します。グループID、クライアントID、および列区切り文字を指定し、デフォルトですべてのパーティションのデータを自動的に使用するようにシステムを有効にし、データが使用可能な開始オフセットからパーティションをサブスクライブします。サンプルコード:
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );example_dbデータベースのtest_tableテーブルに対して、test2という名前のRoutine Loadジョブを作成します。ジョブに対して厳密モードを有効にします。サンプルコード:
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );指定した時点からパーティションのデータを使用します。サンプルコード:
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
JSON データのインポート
Routine Loadを使用して、次の2種類のJSONデータをインポートできます。
JSONデータには、データレコードが1つだけ含まれており、JSONオブジェクトです。
単一テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、ON TABLE_NAMEステートメントを実行することでテーブル名が指定されます。
{"key1":"value1","key2":"value2","key3":"value3"}動的インポートモードまたは複数テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、テーブル名は指定されません。
table_name|{"key1":"value1","key2":"value2","key3":"value3"}JSONデータは、複数のデータレコードを含む配列です。
単一テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、ON TABLE_NAMEステートメントを実行することでテーブル名が指定されます。
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]動的インポートモードまたは複数テーブルインポートモードを使用する場合、JSONデータは次の形式になります。このモードでは、テーブル名は指定されません。
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
JSON データをインポートします。
ApsaraDB for SelectDBインスタンスにデータを読み込みたいテーブルを作成します。サンプルコード:
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" // OLAP PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;上記の2種類のJSONデータをトピックにインポートします。サンプルコード:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]JSONデータをさまざまなモードでインポートします。
シンプルモードでJSONデータをインポートします。サンプルコード:
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", // 必要な同時実行数 "max_batch_interval" = "20", // 最大バッチ間隔 "max_batch_rows" = "300000", // 最大バッチ行数 "max_batch_size" = "209715200", // 最大バッチサイズ "strict_mode" = "false", // 厳格モード "format" = "json" // フォーマット ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", // Kafkaブローカーリスト "kafka_topic" = "my_topic", // Kafkaトピック "kafka_partitions" = "0,1,2", // Kafkaパーティション "kafka_offsets" = "0,0,0" // Kafkaオフセット );JSON データを正確にインポートします。サンプルコード:
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", // 必要な同時実行数 "max_batch_interval" = "20", // 最大バッチ間隔 "max_batch_rows" = "300000", // 最大バッチ行数 "max_batch_size" = "209715200", // 最大バッチサイズ "strict_mode" = "false", // 厳格モード "format" = "json", // フォーマット "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", // JSONパス "strip_outer_array" = "true" // 外側の配列を削除 ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", // Kafkaブローカーリスト "kafka_topic" = "my_topic", // Kafkaトピック "kafka_partitions" = "0,1,2", // Kafkaパーティション "kafka_offsets" = "0,0,0" // Kafkaオフセット );説明サンプルデータでは、テーブルのパーティションフィールド
dtは使用できません。 dt フィールドの値は、CREATE ROUTINE LOAD ステートメントのdt=from_unixtime(timestamp,'%Y%m%d')構成を使用して生成されます。
さまざまな認証方式を使用する Kafka クラスターへのアクセス
次の例は、Kafkaクラスターの認証方式に基づいてKafkaクラスターにアクセスする方法を示しています。
SSL認証方式が有効になっているKafkaクラスタにアクセスします。
SSL認証方式が有効になっているKafkaクラスタにアクセスするには、Kafkaブローカーの公開鍵を認証するために使用される証明書ファイル(ca.pem)を提供する必要があります。 Kafkaクラスタでクライアント認証モードが有効になっている場合は、クライアントの公開鍵証明書(client.pem)、秘密鍵ファイル(client.key)、および秘密鍵のパスワードも必要です。必要なファイルは、
CREATE FILEステートメントを実行して、事前にApsaraDB for SelectDBにアップロードする必要があります。カタログ名は kafka です。ファイルをアップロードします。サンプルコード:
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");ルーチンロードジョブを作成します。サンプルコード:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );説明ApsaraDB for SelectDBは、Kafka C++クライアントライブラリ
librdkafkaを使用してKafkaクラスタにアクセスします。 librdkafkaでサポートされているパラメータの詳細については、librdkafkaのConfiguration properties ドキュメントをご参照ください。
PLAIN認証方式が有効になっているKafkaクラスタにアクセスします。
PLAIN認証方式が有効になっているKafkaクラスタにアクセスするには、次の設定を追加する必要があります。
property.security.protocol=SASL_PLAINTEXT: Simple Authentication and Security Layer(SASL)プレーンテキスト認証方式を使用します。
property.sasl.mechanism=PLAIN: SASL認証方式をPLAINに設定します。
property.sasl.username=admin: SASLのユーザー名を指定します。
property.sasl.password=admin: SASLのパスワードを指定します。
ルーチンロードジョブを作成します。サンプルコード:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );Kerberos認証方式が有効になっているKafkaクラスタにアクセスします。
Kerberos認証方式が有効になっているKafkaクラスタにアクセスするには、次の設定を追加する必要があります。
security.protocol=SASL_PLAINTEXT: SASLプレーンテキスト認証方式を使用します。
sasl.kerberos.service.name=$SERVICENAME: ブローカーサービス名を指定します。
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: ローカルの.keytabファイルのパスを指定します。
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: ApsaraDB for SelectDBがKafkaクラスタへの接続に使用するKerberosプリンシパルを指定します。
ルーチンロードジョブを作成します。サンプルコード:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );説明ApsaraDB for SelectDBがKerberos認証方式が有効になっているKafkaクラスタにアクセスできるようにするには、ApsaraDB for SelectDBクラスタのすべての実行ノードにKerberosクライアント kinit をデプロイし、krb5.confファイルを構成し、Key Distribution Center(KDC)サービス情報を指定する必要があります。
property.sasl.kerberos.keytabパラメータをローカルの.keytabファイルの絶対パスに設定し、ApsaraDB for SelectDBプロセスがローカルの.keytabファイルにアクセスできるようにします。
ルーチンロードジョブの変更
PAUSED 状態の既存のルーチンロードジョブを変更できます。
構文
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]パラメーター
パラメーター | 説明 |
[db.]job_name | 変更するジョブの名前。 |
tbl_name | データのインポート先となるテーブルの名前。 |
job_properties | 変更するジョブパラメーター。変更できるパラメーターは次のとおりです。
|
data_source | データソースのタイプ。このパラメーターを |
data_source_properties | データソースのパラメーター。次のパラメーターのみがサポートされています。
説明
|
例
値を サンプルコード:
必要な同時実行数パラメーターを 1 に設定ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );値を変更します サンプルコード:
必要な同時実行数パラメーターを 10 に設定し、パーティションオフセットとグループ ID を変更しますALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
ルーチンロードジョブの一時停止
PAUSE文を実行することでルーチンロードジョブを一時停止し、RESUME文を実行することで一時停止したジョブを再開できます。
構文
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;パラメーター
パラメーター | 説明 |
[db.]job_name | 一時停止するジョブの名前。 |
例
test1という名前のルーチンロードジョブを一時停止するには、次の文を実行します:
PAUSE ROUTINE LOAD FOR test1;すべてのルーチンロードジョブを一時停止するには、次の文を実行します:
PAUSE ALL ROUTINE LOAD;
ルーチンロードジョブの再開
一時停止したルーチンロードジョブを再開できます。再開されたジョブは、最後に消費されたオフセットからデータの消費を続けます。
構文
RESUME [ALL] ROUTINE LOAD FOR <job_name>パラメーター
パラメーター | 説明 |
[db.]job_name | 再開するルーチンロードジョブの名前。 |
例
test1 という名前のルーチンロードジョブを再開するには、次のステートメントを実行します。
RESUME ROUTINE LOAD FOR test1;すべてのルーチンロードジョブを再開するには、次のステートメントを実行します。
RESUME ALL ROUTINE LOAD;
ルーチンロードジョブの停止
ルーチンロードジョブを停止できます。停止されたルーチンロードジョブは再起動できません。ルーチンロードジョブが停止された後、インポートされたデータはロールバックできません。
構文
STOP ROUTINE LOAD FOR <job_name>;パラメーター
パラメーター | 説明 |
[db.]job_name | 停止するジョブの名前。 |
例
test1 という名前のルーチンロードジョブを停止するには、次のステートメントを実行します。
STOP ROUTINE LOAD FOR test1;1 つ以上のルーチンロードジョブのクエリ
SHOW ROUTINE LOAD ステートメントを実行して、1 つ以上のルーチンロードジョブのステータスをクエリできます。
構文
SHOW [ALL] ROUTINE LOAD [FOR job_name];パラメーター
パラメーター | 説明 |
[db.]job_name | クエリするジョブの名前。 |
インポートされたデータの形式が無効な場合、詳細なエラー情報は ErrorLogUrls パラメーターの値に記録されます。 ErrorLogUrls パラメーターの値には複数の URL が含まれています。 URL の 1 つをコピーして、ブラウザーでエラー情報をクエリできます。
例
次のステートメントを実行して、停止およびキャンセルされたジョブを含む、test1 という名前のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。
SHOW ALL ROUTINE LOAD FOR test1;次のステートメントを実行して、test1 という名前の進行中のルーチンロードジョブをクエリします。
SHOW ROUTINE LOAD FOR test1;次のステートメントを実行して、停止およびキャンセルされたジョブを含む、example_db データベース内のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。
use example_db; SHOW ALL ROUTINE LOAD;次のステートメントを実行して、example_db データベース内のすべての進行中のルーチンロードジョブをクエリします。
use example_db; SHOW ROUTINE LOAD;次のステートメントを実行して、example_db データベース内の test1 という名前の進行中のルーチンロードジョブをクエリします。
SHOW ROUTINE LOAD FOR example_db.test1;次のステートメントを実行して、停止およびキャンセルされたジョブを含む、example_db データベース内の test1 という名前のすべてのルーチンロードジョブをクエリします。 結果出力には、各ジョブが別々の行に表示され、ジョブの数に応じて 1 つ以上の行で構成される場合があります。
SHOW ALL ROUTINE LOAD FOR example_db.test1;
関連システム構成
関連システム構成は、Routine Load の使用に影響します。
max_routine_load_task_concurrent_numフロントエンド(FE)パラメーターです。デフォルト値:5。実行時にパラメーターを変更できます。このパラメーターは、Routine Load ジョブに対して同時に実行できるタスクの最大数を指定します。デフォルト値を使用することをお勧めします。パラメーターに大きな値を設定すると、同時実行タスク数が過剰になり、クラスターリソースを大量に占有する可能性があります。
max_routine_load_task_num_per_beFE パラメーターです。デフォルト値:5。実行時にパラメーターを変更できます。このパラメーターは、各 BE ノードで同時に実行できるタスクの最大数を指定します。デフォルト値を使用することをお勧めします。パラメーターに大きな値を設定すると、同時実行タスク数が過剰になり、クラスターリソースを大量に占有する可能性があります。
max_routine_load_job_numFE パラメーターです。デフォルト値:100。実行時にパラメーターを変更できます。このパラメーターは、NEED_SCHEDULED、RUNNING、または PAUSED 状態のジョブを含む、送信できる Routine Load ジョブの最大数を指定します。送信した Routine Load ジョブの合計数が最大値に達すると、それ以上のジョブを送信できなくなります。
max_consumer_num_per_groupBE パラメーターです。デフォルト値:3。このパラメーターは、タスクのデータを使用するために生成できるコンシューマーの最大数を指定します。Kafka データソースの場合、コンシューマーは 1 つ以上の Kafka パーティションのデータを使用する場合があります。タスクが 6 つの Kafka パーティションのデータを使用する場合、3 つのコンシューマーが生成されます。各コンシューマーは 2 つのパーティションのデータを使用します。パーティションが 2 つしかない場合は、2 つのコンシューマーのみが生成され、各コンシューマーは 1 つのパーティションのデータを使用します。
max_tolerable_backend_down_numFE パラメーターです。デフォルト値:0。特定の条件が満たされると、ApsaraDB for SelectDB は PAUSED 状態のジョブを再スケジュールします。その後、再スケジュールされたジョブの状態は RUNNING に変わります。値 0 は、すべての BE ノードが稼働している場合にのみジョブを再スケジュールできることを示します。
period_of_auto_resume_minFE パラメーターです。デフォルト値は 5 分です。これは、ApsaraDB for SelectDB が 5 分以内にジョブを最大 3 回再スケジュールすることを示します。ジョブが 3 回再スケジュールに失敗した場合、ジョブはロックされ、再スケジュールされなくなります。ただし、手動で介入してジョブを再開できます。
その他の説明
Routine LoadジョブとALTER TABLE操作の関係
Routine Loadジョブは、SCHEMA CHANGE操作またはROLLUP操作をブロックしません。ただし、SCHEMA CHANGE操作後にソースデータの列がデスティネーションテーブルの列と一致しない場合、エラーデータレコードの数が増加し、最終的にジョブは一時停止します。この問題を防ぐために、Routine Loadジョブで列マッピングを明示的に指定し、NULLABLE列またはDEFAULT制約付きの列を使用することをお勧めします。
テーブルのパーティションを削除すると、パーティションが見つからないため、データのインポートに失敗する可能性があります。この場合、ジョブは一時停止します。
Routine LoadジョブとLOAD、DELETE、およびINSERT操作の関係
Routine Loadジョブは、LOAD操作またはINSERT操作と競合しません。
テーブルに対してDELETE操作を実行するには、対応するテーブルパーティションにデータがインポートされていないことを確認する必要があります。そのため、DELETE操作を実行する前に、Routine Loadジョブを一時停止し、割り当て済みのすべてのタスクが完了するまで待つ必要があります。
Routine LoadジョブとDROP DATABASEまたはDROP TABLE操作の関係
Routine Loadジョブがデータをインポートしているデータベースまたはテーブルが削除されると、ジョブは自動的にキャンセルされます。
KafkaクラスタのRoutine LoadジョブとKafkaトピックの関係
CREATE ROUTINE LOADステートメントで宣言されている
Kafka topicがKafkaクラスタに存在しない場合、Kafkaブローカーは auto.create.topics.enable パラメータの設定に基づいてトピックを自動的に作成できます。Kafkaブローカーの
auto.create.topics.enableパラメータがtrueに設定されている場合、トピックは自動的に作成されます。自動的に作成されるパーティションの数は、Kafkaブローカーの num.partitions パラメータによって決まります。 Routine Loadジョブは、トピックのデータを読み取り続けます。Kafkaブローカーの
auto.create.topics.enableパラメータがfalseに設定されている場合、トピックは自動的に作成されません。この場合、データが利用可能になるまで、Routine Loadジョブは一時停止されます。
そのため、トピックを自動的に作成する場合は、Kafkaクラスタのブローカーの
auto.create.topics.enableパラメータをtrueに設定します。環境におけるCIDRブロックとドメイン名解決の分離に関する考慮事項
Routine Loadジョブの作成時に指定するブローカーは、ApsaraDB for SelectDBからアクセスできる必要があります。
Kafkaで
advertised.listenersパラメータが設定されている場合、advertised.listenersパラメータの値のアドレスは、ApsaraDB for SelectDBからアクセスできる必要があります。
データ消費のパーティションとオフセットを指定する
ApsaraDB for SelectDBでは、データ消費のパーティション、オフセット、および時点を指定できます。次のセクションでは、パラメータについて説明します。
kafka_partitions: データが消費されるパーティション。例: "0,1,2,3"。kafka_offsets: 各パーティションの開始オフセット。このパラメータに指定するオフセットの数は、kafka_partitionsパラメータに指定するパーティションの数と同じである必要があります。例: "1000,1000,2000,2000"。property.kafka_default_offset: パーティションのデフォルトの開始オフセット。
Routine Loadジョブを作成するときは、次の表に示す5つの方法のいずれかを使用して、3つのパラメータを組み合わせることができます。
方法
kafka_partitions
kafka_offsets
property.kafka_default_offset
動作
1
いいえ
いいえ
いいえ
システムはKafkaトピックのすべてのパーティションを自動的に検索し、パーティションの終了オフセットからデータの消費を開始します。
2
いいえ
いいえ
はい
システムはKafkaトピックのすべてのパーティションを自動的に検索し、デフォルトのオフセットからデータの消費を開始します。
3
はい
いいえ
いいえ
システムは、指定されたパーティションの終了オフセットからデータの消費を開始します。
4
はい
はい
いいえ
システムは、指定されたパーティションの指定されたオフセットからデータの消費を開始します。
5
はい
いいえ
はい
システムは、指定されたパーティションのデフォルトのオフセットからデータの消費を開始します。
STOPPED状態とPAUSED状態の違い
FEは、定期的にSTOPPED状態のRoutine Loadジョブを自動的にクリアします。 PAUSED状態のRoutine Loadジョブは再開できます。