Routine Load は、Apache Kafka から StarRocks へデータを継続的にインジェストします。SQL ステートメントを使用して、任意のタイミングでロードジョブの作成、一時停止、再開、および停止が可能です。
仕組み

Routine Load は、継続的なデータインジェストを管理するために、以下の 4 つの内部コンポーネントを使用します。
| コンポーネント | 役割 |
|---|---|
RoutineLoadJob | 送信されたロードジョブ |
JobScheduler | RoutineLoadJob をタスクに分割し、スケジュールする |
Task | RoutineLoadJob から分割された作業単位 |
TaskScheduler | 個別のタスクをスケジュールおよび実行する |
データインジェストの処理手順は以下のとおりです。
MySQL 互換クライアントを使用して、Kafka ロードジョブをフロントエンド (FE) に送信します。
FE がジョブをタスクに分割します。各タスクは、データの一部をロードします。
各タスクはバックエンド (BE) ノードに配信され、Stream Load メカニズムを使用して実行されます。
BE がタスクの実行結果を FE に報告します。
FE はその結果に基づいて、新しいタスクを生成したり、失敗したタスクを再試行したりします。
このサイクルが継続的に繰り返され、データの流れが途切れることなく維持されます。
このトピックに含まれる図および一部の情報は、オープンソースの StarRocks ドキュメントの「Apache Kafka から継続的にデータをロードする」から引用しています。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
認証なしまたは Secure Sockets Layer (SSL) 認証による Apache Kafka クラスターへのアクセス権限
Kafka トピック内のメッセージが CSV または JSON 形式であること。各 CSV メッセージは 1 行に収まり、末尾に改行が含まれてはなりません。Array 型はサポートされていません。
Apache Kafka 0.10.0.0 以降
ロードジョブの作成
CREATE ROUTINE LOAD <database>.<job_name> ON <table_name>
[COLUMNS TERMINATED BY "column_separator",]
[COLUMNS (col1, col2, ...),]
[WHERE where_condition,]
[PARTITION (part1, part2, ...)]
[PROPERTIES ("key" = "value", ...)]
FROM KAFKA
(data_source_properties1 = 'value1',
data_source_properties2 = 'value2',
...)必須パラメーター
| パラメーター | 説明 |
|---|---|
job_name | ロードジョブの名前。データベース内で一意である必要があります。推奨される形式は <timestamp>_<table_name> です。 |
table_name | 送信先テーブル。 |
kafka_broker_list | Kafka ブローカーの接続文字列。形式: <ip>:<port>。複数のブローカーを指定する場合は、カンマで区切ります。 |
kafka_topic | サブスクライブする Kafka トピック。 |
オプションパラメーター
データ整形
| パラメーター | デフォルト | 説明 |
|---|---|---|
COLUMNS TERMINATED BY | \t | ソースデータ内のカラム区切り文字。 |
COLUMNS | — | ソースカラムと送信先カラムをマップします。スキップするカラムにはプレースホルダー名を指定し、派生カラムには式(例:col4 = col1 + col2)を使用します。 |
WHERE | — | WHEREロード前の行フィルター条件。WHERE 句でフィルターされた行は、エラー行としてカウントされません。 |
PARTITION | — | 特定のパーティションにデータをロードします。省略した場合、StarRocks が自動的にデータをルーティングします。 |
ジョブの動作
| パラメーター | デフォルト | 有効値範囲 | 説明 |
|---|---|---|---|
desired_concurrent_number | 3 | > 0 | ジョブが同時に実行できる最大タスク数。 |
max_batch_interval | 10 | 5~60(秒) | タスクごとの最大実行時間。V1.15 以降では、タスクのスケジューリング間隔(タスクの配信頻度)を制御します。 |
max_batch_rows | 200000 | ≥ 200000 | 各タスクが読み取れる最大行数。V1.15 以降では、エラー検出ウィンドウサイズ(10 × max_batch_rows)を定義します。 |
max_batch_size | 100 MB | 100 MB~1 GB | 各タスクが読み取る最大データ量。V1.15 以降では非推奨です。代わりに、routine_load_task_consume_second を fe.conf で設定してください。 |
max_error_number | 0 | ≥ 0 | サンプリングウィンドウ内で許容される最大エラー行数。0 の場合、エラーは一切許容されません。 |
strict_mode | 有効 | — | 有効化すると、NULL ではないソースカラムが NULL にマップされる行がフィルターされます。false を指定すると無効化されます。 |
timezone | セッションのタイムゾーン | — | ロードジョブ内のすべてのタイムゾーン関連関数に影響します。 |
Kafka ソース設定
| パラメーター | 説明 |
|---|---|
kafka_partitions | サブスクライブするパーティション。カンマ区切りのパーティション ID。 |
kafka_offsets | 各パーティションの開始オフセット。カンマ区切り。 |
property.kafka_default_offsets | デフォルトの開始オフセット。値には OFFSET_BEGINNING、OFFSET_END、または UNIX タイムスタンプを指定できます。 |
property.* | 追加の Kafka プロパティ。Kafka CLI の --property オプションと同等です。 |
フロントエンド (FE) 構成 (fe.conf)
| パラメーター | デフォルト | 説明 |
|---|---|---|
routine_load_task_consume_second | 3 秒 | 各タスクが Kafka からデータを消費する時間。 |
routine_load_task_timeout_second | 15 秒 | タスクの最大実行時間(タイムアウトまで)。 |
例:認証なしの Kafka から JSON データをロード
この例では、example_tbl2_ordertest という名前のロードジョブを作成し、ordertest2 トピックからメッセージを継続的に取得して、example_tbl テーブルにロードします。開始オフセットは、利用可能な最も古いオフセットからとします。pay_dt カラムは、pay_time の UNIX タイムスタンプを from_unixtime() 関数で変換して導出されます。
CREATE ROUTINE LOAD load_test.example_tbl2_ordertest ON example_tbl
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest\G を実行し、State が RUNNING であり、loadedRows(Statistic 内)が増加していることを確認してください。
例:SSL を使用したデータロード
SSL 認証を有効化するには、以下のプロパティを追加します。
-- セキュリティプロトコル
"property.security.protocol" = "ssl",
-- CA 証明書のパス
"property.ssl.ca.location" = "FILE:ca-cert",
-- クライアント証明書およびキー(Kafka サーバーがクライアント認証を強制する場合に必要)
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "******"CREATE FILE ステートメントを使用して証明書ファイルをアップロードする場合、Object Storage Service (OSS) の HTTP アドレスを URL として指定します。OSS のエンドポイントの一覧については、「IPv6 対応のエンドポイントを使用して OSS にアクセスする」をご参照ください。
CREATE FILE 文の構文については、CREATE FILE をご参照ください。
ロードジョブの監視
ジョブステータスの確認
データベース内のすべてのロードジョブ(停止およびキャンセル済みのジョブを含む)を表示するには:
USE load_test;
SHOW ALL ROUTINE LOAD;特定の実行中のジョブを表示するには:
SHOW ROUTINE LOAD FOR load_test.example_tbl2_ordertest;SHOW ROUTINE LOAD に ALL を指定しない場合、実行中のジョブのみが表示されます。SHOW ALL ROUTINE LOAD を使用すると、他の状態のジョブも表示されます。
また、EMR StarRocks Manager コンソールでもロードジョブのステータスを確認できます。EMR StarRocks Manager ページに移動し、左側のナビゲーションウィンドウから メタデータ管理 をクリックし、目的のデータベース名をクリックした後、タスク タブをクリックします。
出力例:
*************************** 1. 行 ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.**.**:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.**.**:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.**.**:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 行が見つかりました (0.00 秒)主要な健全性指標:
State:RUNNINGである必要があります。その他の状態については、下記のジョブ状態表をご参照ください。loadedRowsおよびerrorRows(Statistic内):errorRowsがloadedRowsに対して増加傾向にある場合、ジョブが自動的に一時停止する可能性があります。ReasonOfStateChangedおよびErrorLogUrls内の URL を確認してください。Progress: 各パーティションの現在の Kafka オフセットを示します。定期的に確認し、進捗が継続的に更新されていることを確認してください。
`Statistic` 出力のフィールド:
| フィールド | 単位 | 説明 |
|---|---|---|
receivedBytes | バイト | ジョブ作成以降に受信した合計データ量 |
errorRows | 行 | ロードに失敗した行 |
committedTaskNum | タスク | FE によってコミットされたタスク |
loadedRows | 行 | 正常にロードされた行 |
loadRowsRate | 行/秒 | 現在のロードスループット |
abortedTaskNum | タスク | BE で失敗したタスク |
totalRows | 行 | 受信した合計行数(ロード済み + エラー + 未選択) |
unselectedRows | 行 | WHERE 句によってフィルターされた行 |
receivedBytesRate | バイト/秒 | データ受信レート |
taskExecuteTimeMs | ms | 合計タスク実行時間 |
ジョブ状態
| 状態 | 意味 | 次の操作 |
|---|---|---|
NEED_SCHEDULE | ジョブがスケジュール待ちの状態(新規作成直後または再開直後) | 待機してください。自動的に RUNNING 状態に遷移します。 |
RUNNING | ジョブがデータをロード中 | loadedRows および errorRows |
PAUSED | ジョブが一時停止中。再開可能です。 | RESUME ROUTINE LOAD |
STOPPED | ジョブが完全に停止済み | 再開できません。新しいジョブを作成してください。 |
ジョブが自動的に PAUSED 状態に遷移した場合、エラー行数が max_error_number を超えたことが原因です。ReasonOfStateChanged および ErrorLogUrls 内のエラーログ URL を確認し、原因を特定してください。
ロードジョブの管理
ロードジョブの一時停止
PAUSE ROUTINE LOAD FOR <job_name>;ジョブは PAUSED 状態になります。データインジェストは停止しますが、ジョブ自体は終了しません。Statistic および Progress の値の更新も停止します。RESUME ROUTINE LOAD を実行することで再開できます。
ロードジョブの再開
RESUME ROUTINE LOAD FOR <job_name>;ジョブはまず NEED_SCHEDULE 状態になり、その後再スケジュールされて RUNNING 状態に遷移します。
ロードジョブの変更
PAUSED 状態のジョブのみ変更可能です。
ALTER ROUTINE LOAD FOR <job_name>
PROPERTIES
(
"desired_concurrent_number" = "6"
);ロードジョブの停止
STOP ROUTINE LOAD FOR <job_name>;ジョブは STOPPED 状態になり、完全に終了します。Statistic および Progress の更新は停止し、SHOW ROUTINE LOAD の出力にも表示されなくなります。データインジェストを再開するには、新しいロードジョブを作成してください。

エンドツーエンドの例
この例では、Kafka から CSV データを StarRocks に Routine Load でインジェストするジョブを作成します。
ステップ 1:Kafka データの準備
トピックを作成します。
kafka-topics.sh --create \ --topic order_sr_topic \ --replication-factor 3 \ --partitions 10 \ --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092コンソールプロデューサーを起動します。
kafka-console-producer.sh \ --broker-list core-1-1:9092 \ --topic order_sr_topic以下の各行を入力します。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895 2020050802,2020-05-08,Julien Sorel,France,male,893 2020050803,2020-05-08,Dorian Grey,UK,male,1262 2020051001,2020-05-10,Tess Durbeyfield,US,female,986 2020051101,2020-05-11,Edogawa Conan,japan,male,8924各行は 6 つのカラムで構成されています:
order_id、pay_dt、customer_name、nationality、gender(スキップ対象)、price。
ステップ 2:送信先テーブルの作成
カラム 1、2、3、4、6 に対応する StarRocks テーブルを作成し、gender カラムはスキップします。
CREATE TABLE load_test.routine_load_tbl_csv (
`order_id` bigint NOT NULL COMMENT "注文 ID",
`pay_dt` date NOT NULL COMMENT "購入日付",
`customer_name` varchar(26) NULL COMMENT "顧客名",
`nationality` varchar(26) NULL COMMENT "国",
`price` double NULL COMMENT "支払金額"
)
ENGINE = OLAP
PRIMARY KEY (order_id, pay_dt)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;ステップ 3:ロードジョブの作成
COLUMNS 句では、6 つのソースカラムすべてをマップし、5 番目のカラムをスキップするためにプレースホルダー(temp_gender)を使用します。
CREATE ROUTINE LOAD load_test.routine_load_tbl_ordertest_csv ON routine_load_tbl_csv
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092",
"kafka_topic" = "order_sr_topic",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);ステップ 4:ジョブの検証
SHOW ROUTINE LOAD FOR routine_load_tbl_ordertest_csv;ステータスが RUNNING の場合、ジョブは正常に実行されています。
以下の 3 つのフィールドを確認してください。
Stateの状態はRUNNINGです。DataSourcePropertiesには、"topic":"order_sr_topic"および正しいブローカーアドレスが表示されます。Statisticは、loadedRowsが増加しており、errorRowsが 0 になっていることを示しています。
送信先テーブルをクエリして、データが正しくロードされたことを確認します。
SELECT * FROM load_test.routine_load_tbl_csv LIMIT 5;期待される結果:
| order_id | pay_dt | customer_name | nationality | price |
|---|---|---|---|---|
| 2020050802 | 2020-05-08 | Johann Georg Faust | Deutschland | 895 |
| 2020050802 | 2020-05-08 | Julien Sorel | France | 893 |
| 2020050803 | 2020-05-08 | Dorian Grey | UK | 1262 |
| 2020051001 | 2020-05-10 | Tess Durbeyfield | US | 986 |
| 2020051101 | 2020-05-11 | Edogawa Conan | japan | 8924 |