Routine Load はルーチンのインポート方式です。StarRocks では、この方式を使用して Apache Kafka から継続的にデータをインポートし、SQL ステートメントを使用してインポートジョブの一時停止、再開、および停止を制御できます。このトピックでは、Routine Load の基本原則、インポート例、および FAQ について説明します。
用語
- RoutineLoadJob: 送信されるルーチンインポートジョブ
- JobScheduler: RoutineLoadJob を複数のタスクにスケジュールおよび分割するために使用されるルーチンインポートジョブスケジューラ
- Task: JobScheduler によってルールに基づいて RoutineLoadJob から分割されるタスク
- TaskScheduler: タスクの実行をスケジュールするために使用されるタスクスケジューラ
基本原則

- MySQL プロトコルをサポートするクライアントを使用して、Kafka インポートジョブをフロントエンドに送信します。
- フロントエンドは、インポートジョブを複数のタスクに分割します。各タスクは、指定されたデータの一部をインポートします。
- 各タスクは、実行するために指定されたバックエンドに割り当てられます。バックエンドでは、タスクは通常のインポートジョブと見なされ、Stream Load のインポートメカニズムに基づいてデータをインポートします。
- バックエンドでインポートプロセスが完了すると、バックエンドはインポート結果をフロントエンドに報告します。
- フロントエンドは、インポート結果に基づいて新しいタスクを生成し続けるか、失敗したタスクを再試行します。
- フロントエンドは継続的に新しいタスクを生成して、データの無停止インポートを実現します。
例
環境要件
- 認証なしまたは SSL 認証を使用した Kafka クラスタへのアクセスがサポートされています。
- メッセージは、次のいずれかの形式にすることができます。
- CSV 形式。この形式では、各メッセージが行として機能し、行の末尾には改行が含まれません。
- JSON 形式。
- Array 型はサポートされていません。
- Kafka V0.10.0.0 以降のみがサポートされています。
インポートジョブの作成
- 構文
CREATE ROUTINE LOAD [database.][job_name] ON [table_name] [COLUMNS TERMINATED BY "column_separator" ,] [COLUMNS (col1, col2, ...) ,] [WHERE where_condition ,] [PARTITION (part1, part2, ...)] [PROPERTIES ("key" = "value", ...)] FROM [DATA_SOURCE] [(data_source_properties1 = 'value1', data_source_properties2 = 'value2', ...)]次の表に、パラメータを示します。パラメータ 必須 説明 job_name はい インポートジョブの名前。インポートデータベースの名前を先頭に付けることができます。名前は通常、タイムスタンプとテーブル名を組み合わせた形式です。ジョブの名前は、データベース内で一意である必要があります。 table_name はい 宛先テーブルの名前。 COLUMNS TERMINATED 句 いいえ ソースデータファイルの列区切り文字。デフォルト値: \t。 COLUMNS 句 いいえ ソースデータファイルの列と宛先テーブルの列間のマッピング。 - マップされた列: たとえば、宛先テーブルには col1、col2、col3 の 3 つの列がありますが、ソースデータファイルには 4 つの列があり、宛先テーブルの 1 番目、2 番目、4 番目の列は、ソースデータファイルの col2、col1、col3 に対応しています。この場合、句は
COLUMNS (col2, col1, temp, col3)と記述できます。 temp 列は存在せず、ソースデータファイルの 3 番目の列をスキップするために使用されます。 - 派生列: StarRocks は、ソースデータファイルの列のデータを読み取るだけでなく、データ列の処理操作も提供します。たとえば、宛先テーブルに col4 列が追加され、col4 の値は col1 の値と col2 の値の合計に等しくなります。この場合、句は
COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)と記述できます。
WHERE 句 いいえ 不要な行を除外するために使用するフィルタ条件。フィルタ条件は、マップされた列または派生列で指定できます。 たとえば、k1 が 100 より大きく、k2 が 1000 に等しい行のみをインポートする場合、句は
WHERE k1 > 100 and k2 = 1000と記述できます。PARTITION 句 いいえ 宛先テーブルのパーティション。パーティションを指定しない場合、ソースデータファイルは対応するパーティションに自動的にインポートされます。 PROPERTIES 句 いいえ インポートジョブの共通パラメータ。 desired_concurrent_number いいえ インポートジョブを分割できるタスクの最大数。値は 0 より大きい必要があります。デフォルト値: 3。 max_batch_interval いいえ 各タスクの最大実行時間。有効な値: 5 ~ 60。単位: 秒。デフォルト値: 10。 V1.15 以降では、このパラメータはタスクのスケジュール時間を指定します。タスクを実行する頻度を指定できます。 fe.conf の routine_load_task_consume_second は、タスクがデータを消費するために必要な時間を指定します。デフォルト値: 3 秒。 fe.conf の routine_load_task_timeout_second は、タスクの実行タイムアウト期間を指定します。デフォルト値: 15 秒。
max_batch_rows いいえ 各タスクが読み取ることができる最大行数。値は 200000 以上である必要があります。デフォルト値: 200000。 V1.15 以降では、このパラメータはエラー検出ウィンドウの範囲を定義するためにのみ使用されます。ウィンドウの範囲は 10 × max-batch-rows です。
max_batch_size いいえ 各タスクが読み取ることができる最大バイト数。単位: バイト。有効な値: 100 MB ~ 1 GB。デフォルト値: 100 MB。 V1.15 以降では、このパラメータは破棄されます。 fe.conf の routine_load_task_consume_second は、タスクがデータを消費するために必要な時間を指定します。デフォルト値: 3 秒。
max_error_number いいえ サンプリングウィンドウで許容されるエラー行の最大数。値は 0 以上である必要があります。デフォルト値: 0。エラー行は許可されません。 重要 WHERE 条件によって除外された行はエラー行ではありません。strict_mode いいえ 厳密モードを有効にするかどうかを指定します。デフォルトでは、このモードは有効になっています。 厳密モードを有効にした後、空でない生データの列型が NULL に変更された場合、データは除外されます。厳密モードを無効にするには、このパラメータを false に設定します。
timezone いいえ インポートジョブに使用されるタイムゾーン。 デフォルトでは、セッションの timezone パラメータの値が使用されます。このパラメータは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。
DATA_SOURCE はい データソースのタイプ。値を KAFKA に設定します。 data_source_properties いいえ データソースに関する情報。値には次のフィールドが含まれます。 - kafka_broker_list: Kafka ブローカーの接続情報。形式:
ip:host。複数のブローカーはコンマ (,) で区切ります。 - kafka_topic: サブスクライブする Kafka トピック。説明 kafka_broker_list フィールドと kafka_topic フィールドは必須です。
- kafka_partitions および kafka_offsets: サブスクライブする Kafka パーティションと各パーティションの開始オフセット。
- property: Kafka 関連のプロパティ。このフィールドは、Kafka Shell の
--propertyパラメータと同等です。HELP ROUTINE LOAD;コマンドを実行して、インポートジョブを作成するためのより詳細な構文を表示できます。
説明HELP ROUTINE LOAD;コマンドを実行して、インポートジョブを作成するためのより詳細な構文を表示できます。 - マップされた列: たとえば、宛先テーブルには col1、col2、col3 の 3 つの列がありますが、ソースデータファイルには 4 つの列があり、宛先テーブルの 1 番目、2 番目、4 番目の列は、ソースデータファイルの col2、col1、col3 に対応しています。この場合、句は
- 例: ローカル Kafka クラスタからデータをインポートします。
CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit COLUMNS TERMINATED BY ",", COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted) PROPERTIES ( "desired_concurrent_number"="1", "max_error_number"="1000" ) FROM KAFKA ( "kafka_broker_list"= "localhost:9092", "kafka_topic" = "starrocks-load" );
ジョブのステータスの表示
- 次のコマンドを実行して、データベース内のすべてのルーチンインポートジョブ (停止またはキャンセルされたジョブを含む) を表示します。ジョブは 1 つ以上の行に表示されます。
USE [database]; SHOW ALL ROUTINE LOAD; - 次のコマンドを実行して、データベース内の job_name という名前の実行中のルーチンインポートジョブを表示します。
SHOW ROUTINE LOAD FOR [database].[job_name];
HELP SHOW ROUTINE LOAD コマンドを実行すると、ジョブのステータスを表示するための特定のコマンドと例を取得できます。 HELP SHOW ROUTINE LOAD TASK コマンドを実行すると、ジョブの実行ステータス (タスクを含む) を表示するための特定のコマンドと例を取得できます。
SHOW ALL ROUTINE LOAD コマンドを実行すると、すべての実行中の Routine Load ジョブを表示できます。次の出力が返されます。*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)| パラメータ | 説明 |
| State | インポートジョブのステータス。 RUNNING は、インポートジョブが継続的に実行されていることを示します。 |
| Statistic | ジョブが作成されてからのインポート情報を記録する進捗情報。 |
| receivedBytes | 受信データのサイズ。単位: バイト。 |
| errorRows | インポートされたエラー行の数。 |
| committedTaskNum | フロントエンドによって送信されたタスクの数。 |
| loadedRows | インポートされた行の数。 |
| loadRowsRate | データのインポートレート。単位: 行/秒。 |
| abortedTaskNum | バックエンドで失敗したタスクの数。 |
| totalRows | 受信した行の総数。 |
| unselectedRows | WHERE 条件によって除外された行の数。 |
| receivedBytesRate | データの受信レート。単位: バイト/秒。 |
| taskExecuteTimeMs | インポート時間。単位: ミリ秒。 |
| ErrorLogUrls | エラーメッセージログ。 URL を使用して、インポートプロセス中のエラーメッセージを表示できます。 |
インポートジョブの一時停止
PAUSE ステートメントを使用して、インポートジョブを PAUSED 状態にします。データのインポートは一時停止されますが、ジョブは終了しません。 RESUME ステートメントを使用してジョブを再開できます。
PAUSE ROUTINE LOAD FOR [job_name];HELP PAUSE ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。
インポートジョブが一時停止されると、ジョブのステータスは PAUSED に変更され、Statistic および Progress のインポート情報は更新されなくなります。この場合、ジョブは終了しません。 SHOW ROUTINE LOAD ステートメントを使用して、一時停止されたインポートジョブを表示できます。
インポートジョブの再開
RESUME ステートメントを使用して、ジョブを一時的に NEED_SCHEDULE 状態にします。ジョブは再スケジュールされています。しばらくすると、ジョブは RUNNING 状態に戻り、データのインポートが続行されます。
RESUME ROUTINE LOAD FOR [job_name];HELP RESUME ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。
SHOW ROUTINE LOAD コマンドを実行して、ジョブのステータスを表示します。次の出力が返されます。*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: NEED_SCHEDULE
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
Progress: {"0":"13824771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
OtherMsg:
1 row in set (0.00 sec)SHOW ROUTINE LOAD コマンドを再度実行して、ジョブのステータスを表示します。次の出力が返されます。*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":23623}
Progress: {"0":"14024771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.**.**:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specifiedインポートジョブの停止
STOP ステートメントを使用して、インポートジョブを STOP 状態にします。データのインポートは停止し、ジョブは終了します。データのインポートを再開することはできません。
STOP ROUTINE LOAD FOR [job_name];HELP STOP ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。
SHOW ROUTINE LOAD コマンドを実行して、ジョブのステータスを表示します。次の出力が返されます。*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: 2020-05-16 16:08:25
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: STOPPED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecute0,"taskExecuteTimeMs":47173}
Progress: {"0":"16414875"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.**.**:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.**.**:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
OtherMsg:インポートジョブが停止されると、ジョブのステータスは STOPPED に変更され、Statistic および Progress のインポート情報は更新されなくなります。この場合、SHOW ROUTINE LOAD ステートメントを使用して停止されたインポートジョブを表示することはできません。