すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Routine Load

最終更新日:Mar 26, 2026

Routine Load は、Apache Kafka から StarRocks へデータを継続的にインジェストします。SQL ステートメントを使用して、任意のタイミングでロードジョブの作成、一時停止、再開、および停止が可能です。

仕組み

Routine Load

Routine Load は、継続的なデータインジェストを管理するために、以下の 4 つの内部コンポーネントを使用します。

コンポーネント役割
RoutineLoadJob送信されたロードジョブ
JobSchedulerRoutineLoadJob をタスクに分割し、スケジュールする
TaskRoutineLoadJob から分割された作業単位
TaskScheduler個別のタスクをスケジュールおよび実行する

データインジェストの処理手順は以下のとおりです。

  1. MySQL 互換クライアントを使用して、Kafka ロードジョブをフロントエンド (FE) に送信します。

  2. FE がジョブをタスクに分割します。各タスクは、データの一部をロードします。

  3. 各タスクはバックエンド (BE) ノードに配信され、Stream Load メカニズムを使用して実行されます。

  4. BE がタスクの実行結果を FE に報告します。

  5. FE はその結果に基づいて、新しいタスクを生成したり、失敗したタスクを再試行したりします。

  6. このサイクルが継続的に繰り返され、データの流れが途切れることなく維持されます。

このトピックに含まれる図および一部の情報は、オープンソースの StarRocks ドキュメントの「Apache Kafka から継続的にデータをロードする」から引用しています。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • 認証なしまたは Secure Sockets Layer (SSL) 認証による Apache Kafka クラスターへのアクセス権限

  • Kafka トピック内のメッセージが CSV または JSON 形式であること。各 CSV メッセージは 1 行に収まり、末尾に改行が含まれてはなりません。Array 型はサポートされていません。

  • Apache Kafka 0.10.0.0 以降

ロードジョブの作成

CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
    [COLUMNS TERMINATED BY "column_separator",]
    [COLUMNS (col1, col2, ...),]
    [WHERE where_condition,]
    [PARTITION (part1, part2, ...)]
    [PROPERTIES ("key" = "value", ...)]
    FROM KAFKA
    (data_source_properties1 = 'value1',
    data_source_properties2 = 'value2',
    ...)

必須パラメーター

パラメーター説明
job_nameロードジョブの名前。データベース内で一意である必要があります。推奨される形式は <timestamp>_<table_name> です。
table_name送信先テーブル。
kafka_broker_listKafka ブローカーの接続文字列。形式: <ip>:<port>。複数のブローカーを指定する場合は、カンマで区切ります。
kafka_topicサブスクライブする Kafka トピック。

オプションパラメーター

データ整形

パラメーターデフォルト説明
COLUMNS TERMINATED BY\tソースデータ内のカラム区切り文字。
COLUMNSソースカラムと送信先カラムをマップします。スキップするカラムにはプレースホルダー名を指定し、派生カラムには式(例:col4 = col1 + col2)を使用します。
WHEREWHEREロード前の行フィルター条件。WHERE 句でフィルターされた行は、エラー行としてカウントされません。
PARTITION特定のパーティションにデータをロードします。省略した場合、StarRocks が自動的にデータをルーティングします。

ジョブの動作

パラメーターデフォルト有効値範囲説明
desired_concurrent_number3> 0ジョブが同時に実行できる最大タスク数。
max_batch_interval105~60(秒)タスクごとの最大実行時間。V1.15 以降では、タスクのスケジューリング間隔(タスクの配信頻度)を制御します。
max_batch_rows200000≥ 200000各タスクが読み取れる最大行数。V1.15 以降では、エラー検出ウィンドウサイズ(10 × max_batch_rows)を定義します。
max_batch_size100 MB100 MB~1 GB各タスクが読み取る最大データ量。V1.15 以降では非推奨です。代わりに、routine_load_task_consume_secondfe.conf で設定してください。
max_error_number0≥ 0サンプリングウィンドウ内で許容される最大エラー行数。0 の場合、エラーは一切許容されません。
strict_mode有効有効化すると、NULL ではないソースカラムが NULL にマップされる行がフィルターされます。false を指定すると無効化されます。
timezoneセッションのタイムゾーンロードジョブ内のすべてのタイムゾーン関連関数に影響します。

Kafka ソース設定

パラメーター説明
kafka_partitionsサブスクライブするパーティション。カンマ区切りのパーティション ID。
kafka_offsets各パーティションの開始オフセット。カンマ区切り。
property.kafka_default_offsetsデフォルトの開始オフセット。値には OFFSET_BEGINNINGOFFSET_END、または UNIX タイムスタンプを指定できます。
property.*追加の Kafka プロパティ。Kafka CLI の --property オプションと同等です。

フロントエンド (FE) 構成 (fe.conf)

パラメーターデフォルト説明
routine_load_task_consume_second3 秒各タスクが Kafka からデータを消費する時間。
routine_load_task_timeout_second15 秒タスクの最大実行時間(タイムアウトまで)。

例:認証なしの Kafka から JSON データをロード

この例では、example_tbl2_ordertest という名前のロードジョブを作成し、ordertest2 トピックからメッセージを継続的に取得して、example_tbl テーブルにロードします。開始オフセットは、利用可能な最も古いオフセットからとします。pay_dt カラムは、pay_time の UNIX タイムスタンプを from_unixtime() 関数で変換して導出されます。

CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
    "desired_concurrent_number" = "5",
    "format" = "json",
    "jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
    "kafka_topic" = "ordertest2",
    "kafka_partitions" = "0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest\G を実行し、StateRUNNING であり、loadedRowsStatistic 内)が増加していることを確認してください。

例:SSL を使用したデータロード

SSL 認証を有効化するには、以下のプロパティを追加します。

-- セキュリティプロトコル
"property.security.protocol" = "ssl",

-- CA 証明書のパス
"property.ssl.ca.location" = "FILE:ca-cert",

-- クライアント証明書およびキー(Kafka サーバーがクライアント認証を強制する場合に必要)
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "******"

CREATE FILE ステートメントを使用して証明書ファイルをアップロードする場合、Object Storage Service (OSS) の HTTP アドレスを URL として指定します。OSS のエンドポイントの一覧については、「IPv6 対応のエンドポイントを使用して OSS にアクセスする」をご参照ください。

CREATE FILE 文の構文については、CREATE FILE をご参照ください。

ロードジョブの監視

ジョブステータスの確認

データベース内のすべてのロードジョブ(停止およびキャンセル済みのジョブを含む)を表示するには:

USE load_test;
SHOW ALL ROUTINE LOAD;

特定の実行中のジョブを表示するには:

SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;
重要

SHOW ROUTINE LOADALL を指定しない場合、実行中のジョブのみが表示されます。SHOW ALL ROUTINE LOAD を使用すると、他の状態のジョブも表示されます。

また、EMR StarRocks Manager コンソールでもロードジョブのステータスを確認できます。EMR StarRocks Manager ページに移動し、左側のナビゲーションウィンドウから メタデータ管理 をクリックし、目的のデータベース名をクリックした後、タスク タブをクリックします。

出力例:

*************************** 1. 行 ***************************

                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
           Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
            Progress: {"0":"13634667"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
            OtherMsg:
1 行が見つかりました (0.00 秒)

主要な健全性指標:

  • StateRUNNING である必要があります。その他の状態については、下記のジョブ状態表をご参照ください。

  • loadedRows および errorRowsStatistic 内): errorRowsloadedRows に対して増加傾向にある場合、ジョブが自動的に一時停止する可能性があります。ReasonOfStateChanged および ErrorLogUrls 内の URL を確認してください。

  • Progress: 各パーティションの現在の Kafka オフセットを示します。定期的に確認し、進捗が継続的に更新されていることを確認してください。

`Statistic` 出力のフィールド:

フィールド単位説明
receivedBytesバイトジョブ作成以降に受信した合計データ量
errorRowsロードに失敗した行
committedTaskNumタスクFE によってコミットされたタスク
loadedRows正常にロードされた行
loadRowsRate行/秒現在のロードスループット
abortedTaskNumタスクBE で失敗したタスク
totalRows受信した合計行数(ロード済み + エラー + 未選択)
unselectedRowsWHERE 句によってフィルターされた行
receivedBytesRateバイト/秒データ受信レート
taskExecuteTimeMsms合計タスク実行時間

ジョブ状態

状態意味次の操作
NEED_SCHEDULEジョブがスケジュール待ちの状態(新規作成直後または再開直後)待機してください。自動的に RUNNING 状態に遷移します。
RUNNINGジョブがデータをロード中loadedRows および errorRows
PAUSEDジョブが一時停止中。再開可能です。RESUME ROUTINE LOAD
STOPPEDジョブが完全に停止済み再開できません。新しいジョブを作成してください。

ジョブが自動的に PAUSED 状態に遷移した場合、エラー行数が max_error_number を超えたことが原因です。ReasonOfStateChanged および ErrorLogUrls 内のエラーログ URL を確認し、原因を特定してください。

ロードジョブの管理

ロードジョブの一時停止

PAUSE ROUTINE LOAD FOR <job_name>;

ジョブは PAUSED 状態になります。データインジェストは停止しますが、ジョブ自体は終了しません。Statistic および Progress の値の更新も停止します。RESUME ROUTINE LOAD を実行することで再開できます。

ロードジョブの再開

RESUME ROUTINE LOAD FOR <job_name>;

ジョブはまず NEED_SCHEDULE 状態になり、その後再スケジュールされて RUNNING 状態に遷移します。

ロードジョブの変更

PAUSED 状態のジョブのみ変更可能です。

ALTER ROUTINE LOAD FOR <job_name>
PROPERTIES
(
    "desired_concurrent_number" = "6"
);

ロードジョブの停止

STOP ROUTINE LOAD FOR <job_name>;

ジョブは STOPPED 状態になり、完全に終了します。Statistic および Progress の更新は停止し、SHOW ROUTINE LOAD の出力にも表示されなくなります。データインジェストを再開するには、新しいロードジョブを作成してください。

stop

エンドツーエンドの例

この例では、Kafka から CSV データを StarRocks に Routine Load でインジェストするジョブを作成します。

ステップ 1:Kafka データの準備

  1. トピックを作成します。

    kafka-topics.sh --create \
      --topic order_sr_topic \
      --replication-factor 3 \
      --partitions 10 \
      --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092
  2. コンソールプロデューサーを起動します。

    kafka-console-producer.sh \
      --broker-list core-1-1:9092 \
      --topic order_sr_topic
  3. 以下の各行を入力します。

    2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
    2020050802,2020-05-08,Julien Sorel,France,male,893
    2020050803,2020-05-08,Dorian Grey,UK,male,1262
    2020051001,2020-05-10,Tess Durbeyfield,US,female,986
    2020051101,2020-05-11,Edogawa Conan,japan,male,8924

    各行は 6 つのカラムで構成されています:order_idpay_dtcustomer_namenationalitygender(スキップ対象)、price

ステップ 2:送信先テーブルの作成

カラム 1、2、3、4、6 に対応する StarRocks テーブルを作成し、gender カラムはスキップします。

CREATE TABLE load_test.routine_load_tbl_csv (
    `order_id`       bigint      NOT NULL COMMENT "注文 ID",
    `pay_dt`         date        NOT NULL COMMENT "購入日付",
    `customer_name`  varchar(26) NULL     COMMENT "顧客名",
    `nationality`    varchar(26) NULL     COMMENT "国",
    `price`          double      NULL     COMMENT "支払金額"
)
ENGINE = OLAP
PRIMARY KEY (order_id, pay_dt)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;

ステップ 3:ロードジョブの作成

COLUMNS 句では、6 つのソースカラムすべてをマップし、5 番目のカラムをスキップするためにプレースホルダー(temp_gender)を使用します。

CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
    "desired_concurrent_number" = "5"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
    "kafka_topic" = "order_sr_topic",
    "kafka_partitions" = "0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

ステップ 4:ジョブの検証

SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;

ステータスが RUNNING の場合、ジョブは正常に実行されています。

以下の 3 つのフィールドを確認してください。

  1. State の状態は RUNNING です。

  2. DataSourceProperties には、"topic":"order_sr_topic" および正しいブローカーアドレスが表示されます。

  3. Statistic は、loadedRows が増加しており、errorRows が 0 になっていることを示しています。

送信先テーブルをクエリして、データが正しくロードされたことを確認します。

SELECT * FROM load_test.routine_load_tbl_csv LIMIT 5;

期待される結果:

order_idpay_dtcustomer_namenationalityprice
20200508022020-05-08Johann Georg FaustDeutschland895
20200508022020-05-08Julien SorelFrance893
20200508032020-05-08Dorian GreyUK1262
20200510012020-05-10Tess DurbeyfieldUS986
20200511012020-05-11Edogawa Conanjapan8924