すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Routine Load

最終更新日:Jan 11, 2025

Routine Load はルーチンのインポート方式です。StarRocks では、この方式を使用して Apache Kafka から継続的にデータをインポートし、SQL ステートメントを使用してインポートジョブの一時停止、再開、および停止を制御できます。このトピックでは、Routine Load の基本原則、インポート例、および FAQ について説明します。

用語

  • RoutineLoadJob: 送信されるルーチンインポートジョブ
  • JobScheduler: RoutineLoadJob を複数のタスクにスケジュールおよび分割するために使用されるルーチンインポートジョブスケジューラ
  • Task: JobScheduler によってルールに基づいて RoutineLoadJob から分割されるタスク
  • TaskScheduler: タスクの実行をスケジュールするために使用されるタスクスケジューラ

基本原則

次の図は、Routine Load のインポートプロセスを示しています。Routine Load
次の手順は、Routine Load を使用してデータをインポートする方法を示しています。
  1. MySQL プロトコルをサポートするクライアントを使用して、Kafka インポートジョブをフロントエンドに送信します。
  2. フロントエンドは、インポートジョブを複数のタスクに分割します。各タスクは、指定されたデータの一部をインポートします。
  3. 各タスクは、実行するために指定されたバックエンドに割り当てられます。バックエンドでは、タスクは通常のインポートジョブと見なされ、Stream Load のインポートメカニズムに基づいてデータをインポートします。
  4. バックエンドでインポートプロセスが完了すると、バックエンドはインポート結果をフロントエンドに報告します。
  5. フロントエンドは、インポート結果に基づいて新しいタスクを生成し続けるか、失敗したタスクを再試行します。
  6. フロントエンドは継続的に新しいタスクを生成して、データの無停止インポートを実現します。
説明 このトピックの画像と一部の情報は、オープンソースの StarRocks のContinuously load data from Apache Kafkaからのものです。

環境要件

  • 認証なしまたは SSL 認証を使用した Kafka クラスタへのアクセスがサポートされています。
  • メッセージは、次のいずれかの形式にすることができます。
    • CSV 形式。この形式では、各メッセージが行として機能し、行の末尾には改行が含まれません。
    • JSON 形式。
  • Array 型はサポートされていません。
  • Kafka V0.10.0.0 以降のみがサポートされています。

インポートジョブの作成

  • 構文
    CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
        [COLUMNS TERMINATED BY "column_separator" ,]
        [COLUMNS (col1, col2, ...) ,]
        [WHERE where_condition ,]
        [PARTITION (part1, part2, ...)]
        [PROPERTIES ("key" = "value", ...)]
        FROM [DATA_SOURCE]
        [(data_source_properties1 = 'value1',
        data_source_properties2 = 'value2',
        ...)]
    次の表に、パラメータを示します。
    パラメータ必須説明
    job_nameはいインポートジョブの名前。インポートデータベースの名前を先頭に付けることができます。名前は通常、タイムスタンプとテーブル名を組み合わせた形式です。ジョブの名前は、データベース内で一意である必要があります。
    table_nameはい宛先テーブルの名前。
    COLUMNS TERMINATED 句いいえソースデータファイルの列区切り文字。デフォルト値: \t。
    COLUMNS 句いいえソースデータファイルの列と宛先テーブルの列間のマッピング。
    • マップされた列: たとえば、宛先テーブルには col1、col2、col3 の 3 つの列がありますが、ソースデータファイルには 4 つの列があり、宛先テーブルの 1 番目、2 番目、4 番目の列は、ソースデータファイルの col2、col1、col3 に対応しています。この場合、句は COLUMNS (col2, col1, temp, col3) と記述できます。 temp 列は存在せず、ソースデータファイルの 3 番目の列をスキップするために使用されます。
    • 派生列: StarRocks は、ソースデータファイルの列のデータを読み取るだけでなく、データ列の処理操作も提供します。たとえば、宛先テーブルに col4 列が追加され、col4 の値は col1 の値と col2 の値の合計に等しくなります。この場合、句は COLUMNS (col2, col1, temp, col3, col4 = col1 + col2) と記述できます。
    WHERE 句いいえ不要な行を除外するために使用するフィルタ条件。フィルタ条件は、マップされた列または派生列で指定できます。

    たとえば、k1 が 100 より大きく、k2 が 1000 に等しい行のみをインポートする場合、句は WHERE k1 > 100 and k2 = 1000 と記述できます。

    PARTITION 句いいえ宛先テーブルのパーティション。パーティションを指定しない場合、ソースデータファイルは対応するパーティションに自動的にインポートされます。
    PROPERTIES 句いいえインポートジョブの共通パラメータ。
    desired_concurrent_numberいいえインポートジョブを分割できるタスクの最大数。値は 0 より大きい必要があります。デフォルト値: 3。
    max_batch_intervalいいえ各タスクの最大実行時間。有効な値: 5 ~ 60。単位: 秒。デフォルト値: 10。

    V1.15 以降では、このパラメータはタスクのスケジュール時間を指定します。タスクを実行する頻度を指定できます。 fe.confroutine_load_task_consume_second は、タスクがデータを消費するために必要な時間を指定します。デフォルト値: 3 秒。 fe.confroutine_load_task_timeout_second は、タスクの実行タイムアウト期間を指定します。デフォルト値: 15 秒。

    max_batch_rowsいいえ各タスクが読み取ることができる最大行数。値は 200000 以上である必要があります。デフォルト値: 200000。

    V1.15 以降では、このパラメータはエラー検出ウィンドウの範囲を定義するためにのみ使用されます。ウィンドウの範囲は 10 × max-batch-rows です。

    max_batch_sizeいいえ各タスクが読み取ることができる最大バイト数。単位: バイト。有効な値: 100 MB ~ 1 GB。デフォルト値: 100 MB。

    V1.15 以降では、このパラメータは破棄されます。 fe.confroutine_load_task_consume_second は、タスクがデータを消費するために必要な時間を指定します。デフォルト値: 3 秒。

    max_error_numberいいえサンプリングウィンドウで許容されるエラー行の最大数。値は 0 以上である必要があります。デフォルト値: 0。エラー行は許可されません。
    重要 WHERE 条件によって除外された行はエラー行ではありません。
    strict_modeいいえ厳密モードを有効にするかどうかを指定します。デフォルトでは、このモードは有効になっています。

    厳密モードを有効にした後、空でない生データの列型が NULL に変更された場合、データは除外されます。厳密モードを無効にするには、このパラメータを false に設定します。

    timezoneいいえインポートジョブに使用されるタイムゾーン。

    デフォルトでは、セッションの timezone パラメータの値が使用されます。このパラメータは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。

    DATA_SOURCEはいデータソースのタイプ。値を KAFKA に設定します。
    data_source_propertiesいいえデータソースに関する情報。値には次のフィールドが含まれます。
    • kafka_broker_list: Kafka ブローカーの接続情報。形式: ip:host。複数のブローカーはコンマ (,) で区切ります。
    • kafka_topic: サブスクライブする Kafka トピック。
      説明 kafka_broker_list フィールドと kafka_topic フィールドは必須です。
    • kafka_partitions および kafka_offsets: サブスクライブする Kafka パーティションと各パーティションの開始オフセット。
    • property: Kafka 関連のプロパティ。このフィールドは、Kafka Shell の --property パラメータと同等です。 HELP ROUTINE LOAD; コマンドを実行して、インポートジョブを作成するためのより詳細な構文を表示できます。
    説明 HELP ROUTINE LOAD; コマンドを実行して、インポートジョブを作成するためのより詳細な構文を表示できます。
  • 例: ローカル Kafka クラスタからデータをインポートします。
    CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
    COLUMNS TERMINATED BY ",",
    COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
    PROPERTIES
    (
        "desired_concurrent_number"="1",
        "max_error_number"="1000"
    )
    FROM KAFKA
    (
        "kafka_broker_list"= "localhost:9092",
        "kafka_topic" = "starrocks-load"
    );

ジョブのステータスの表示

  • 次のコマンドを実行して、データベース内のすべてのルーチンインポートジョブ (停止またはキャンセルされたジョブを含む) を表示します。ジョブは 1 つ以上の行に表示されます。
    USE [database];
    SHOW ALL ROUTINE LOAD;
  • 次のコマンドを実行して、データベース内の job_name という名前の実行中のルーチンインポートジョブを表示します。
    SHOW ROUTINE LOAD FOR [database].[job_name];
重要 StarRocks では、実行中のジョブのみを表示できます。完了したジョブと実行されていないジョブは表示できません。

HELP SHOW ROUTINE LOAD コマンドを実行すると、ジョブのステータスを表示するための特定のコマンドと例を取得できます。 HELP SHOW ROUTINE LOAD TASK コマンドを実行すると、ジョブの実行ステータス (タスクを含む) を表示するための特定のコマンドと例を取得できます。

SHOW ALL ROUTINE LOAD コマンドを実行すると、すべての実行中の Routine Load ジョブを表示できます。次の出力が返されます。
*************************** 1. row ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 row in set (0.00 sec)
この例では、routine_load_wikipedia という名前のインポートジョブが作成されます。次の表に、パラメータを示します。
パラメータ説明
Stateインポートジョブのステータス。 RUNNING は、インポートジョブが継続的に実行されていることを示します。
Statisticジョブが作成されてからのインポート情報を記録する進捗情報。
receivedBytes受信データのサイズ。単位: バイト。
errorRowsインポートされたエラー行の数。
committedTaskNumフロントエンドによって送信されたタスクの数。
loadedRowsインポートされた行の数。
loadRowsRateデータのインポートレート。単位: 行/秒。
abortedTaskNumバックエンドで失敗したタスクの数。
totalRows受信した行の総数。
unselectedRowsWHERE 条件によって除外された行の数。
receivedBytesRateデータの受信レート。単位: バイト/秒。
taskExecuteTimeMsインポート時間。単位: ミリ秒。
ErrorLogUrlsエラーメッセージログ。 URL を使用して、インポートプロセス中のエラーメッセージを表示できます。

インポートジョブの一時停止

PAUSE ステートメントを使用して、インポートジョブを PAUSED 状態にします。データのインポートは一時停止されますが、ジョブは終了しません。 RESUME ステートメントを使用してジョブを再開できます。

たとえば、job_name という名前のルーチンインポートジョブを一時停止します。
PAUSE ROUTINE LOAD FOR [job_name];

HELP PAUSE ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。

インポートジョブが一時停止されると、ジョブのステータスは PAUSED に変更され、Statistic および Progress のインポート情報は更新されなくなります。この場合、ジョブは終了しません。 SHOW ROUTINE LOAD ステートメントを使用して、一時停止されたインポートジョブを表示できます。

インポートジョブの再開

RESUME ステートメントを使用して、ジョブを一時的に NEED_SCHEDULE 状態にします。ジョブは再スケジュールされています。しばらくすると、ジョブは RUNNING 状態に戻り、データのインポートが続行されます。

たとえば、job_name という名前のルーチンインポートジョブを再開します。
RESUME ROUTINE LOAD FOR [job_name];

HELP RESUME ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。

SHOW ROUTINE LOAD コマンドを実行して、ジョブのステータスを表示します。次の出力が返されます。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: NEED_SCHEDULE
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
            Progress: {"0":"13824771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
            OtherMsg:
1 row in set (0.00 sec)
SHOW ROUTINE LOAD コマンドを再度実行して、ジョブのステータスを表示します。次の出力が返されます。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":23623}
            Progress: {"0":"14024771"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.**.**:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.**.**:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
            OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified
説明 ジョブを初めてクエリすると、ジョブのステータスは NEED_SCHEDULE に変更されます。ジョブは再スケジュールされています。ジョブを 2 回目にクエリすると、ジョブのステータスは RUNNING に変更されます。同時に、Statistic および Progress のインポート情報の更新が開始され、データのインポートが続行されます。

インポートジョブの停止

STOP ステートメントを使用して、インポートジョブを STOP 状態にします。データのインポートは停止し、ジョブは終了します。データのインポートを再開することはできません。

たとえば、job_name という名前のルーチンインポートジョブを停止します。
STOP ROUTINE LOAD FOR [job_name];

HELP STOP ROUTINE LOAD コマンドを使用して、ヘルプ情報と例を表示できます。

SHOW ROUTINE LOAD コマンドを実行して、ジョブのステータスを表示します。次の出力が返されます。
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: 2020-05-16 16:08:25
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: STOPPED
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecute0,"taskExecuteTimeMs":47173}
            Progress: {"0":"16414875"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.**.**:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.**.**:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
            OtherMsg:

インポートジョブが停止されると、ジョブのステータスは STOPPED に変更され、Statistic および Progress のインポート情報は更新されなくなります。この場合、SHOW ROUTINE LOAD ステートメントを使用して停止されたインポートジョブを表示することはできません。