すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Broker Load

最終更新日:Mar 27, 2026

ブローカーロードは、StarRocks の非同期インポートメソッドです。ブローカープロセスを使用して外部ストレージシステムからデータを読み取り、StarRocks 独自の計算リソースを使用してデータを前処理し、ロードします。ブローカーロードを使用すると、Hadoop 分散ファイルシステム (HDFS) または Alibaba Cloud Object Storage Service (OSS) から、ジョブあたり数十から数百 GB のデータをインポートできます。単一のブローカーロードジョブは、複数のターゲットテーブルをアトミックに処理できます。つまり、すべてのテーブルが成功するか、すべて失敗するかのいずれかです。

ブローカーロードの仕組み

ブローカーロードには、ご利用の StarRocks バージョンに応じて 2 つの動作モードがあります。

  • ブローカーベースのロード (StarRocks 2.5.8 より前のバージョン): StarRocks は、ブローカープロセスに依存して外部ストレージに接続します。ロードステートメントで WITH BROKER "<broker_name>" を使用して、ブローカー名を明示的に指定します。

  • ブローカーフリーのロード (StarRocks 2.5.8 以降): StarRocks は外部ストレージに直接接続します。ブローカー名は省略しますが、ロードステートメントの WITH BROKER キーワードは保持します。

E-MapReduce (EMR) で StarRocks クラスターを作成すると、ブローカープロセスはすべてのコアノードに自動的にデプロイされ、起動します。

サポートされているソースとフォーマット

ディメンション サポートされている値
データソース HDFS、OSS
ファイル形式 CSV (デフォルト)、ORC、Parquet (.parquet または .parq)

前提条件

開始する前に、以下が揃っていることを確認してください。

  • 少なくとも 1 つのコアノードを持つ EMR で実行中の StarRocks クラスター

  • HDFS または OSS のアクセス認証情報

  • StarRocks のターゲットデータベースとテーブル

ブローカーのクエリ

クラスターでブローカーが実行されていることを確認するには、以下を実行します。

SHOW PROC "/brokers"\G

出力には、各ブローカーの IP アドレス、ポート (デフォルト: 8000)、稼働状況、タイムスタンプが一覧表示されます。

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:

インポートジョブの作成

すべてのインポートジョブは LOAD LABEL ステートメントを使用します。構文は StarRocks のバージョンによって若干異なります。

StarRocks 2.5.8 より前のバージョン

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_name broker_properties
    [PROPERTIES (key1=value1, ... )]

StarRocks 2.5.8 以降のバージョン

LOAD LABEL db_name.label_name
    (data_desc, ...)
WITH BROKER broker_properties
    [PROPERTIES (key1=value1, ... )]
EMR StarRocks クラスターでは、ブローカー名として broker を使用します。

完全な構文リファレンスを表示するには、HELP BROKER LOAD を実行します。

ラベル

各インポートジョブには一意のラベルが必要です。カスタムラベルを定義するか、システムに生成させることができます。ラベルには 2 つの目的があります。

  • SHOW LOAD WHERE label = '<label>' でジョブステータスを追跡する

  • 重複インポートを防止する — FINISHED 状態のジョブがすでに同じラベルを持っている場合、StarRocks は新しいジョブを拒否します

ジョブが FINISHED 状態に達すると、ラベルは有効期限切れになります。ジョブが CANCELLED 状態になると、同じラベルを再利用してジョブを再送信できます。

data_desc パラメーター

data_desc 句は、1 つのターゲットテーブルのソースデータを記述します。単一のジョブには、異なるテーブルをターゲットとする複数の data_desc 句を含めることができます。StarRocks は、1 つのジョブ内のすべてのテーブルにわたる原子性を保証します。

data_desc:
    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY column_separator]
    [FORMAT AS file_type]
    [(col1, ...)]
    [COLUMNS FROM PATH AS (colx, ...)]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
パラメーター 説明
file_path 特定のファイルパス、またはワイルドカード (?*[]{}^) を使用したグロブパターン。たとえば、hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ は、/tablename の下のすべてのパーティション内のすべてのファイルをインポートします。ワイルドカード構文については、FileSystem.globStatus をご参照ください。
NEGATIVE ソース行を削除としてマークします。これを使用して、SUM 型の集計列へのバッチインポートを取り消します。
PARTITION インポートをターゲットテーブルの指定されたパーティションに制限します。リストされたパーティションに属さない行はエラーとしてカウントされます。代わりに WHERE 述語を使用してそれらをフィルタリングします。
column_separator ソースファイルの列区切り文字。デフォルト: \t。非表示文字には、16 進数形式 (例: Hive のデフォルト \x01 の場合は \\x01) を使用します。
file_type ソースファイル形式: csv (デフォルト)、orc、または parquet
COLUMNS FROM PATH AS ファイルパスからパーティションフィールド値を抽出します。たとえば、パス /path/col_name=col_value/dt=20210101/file1 を使用すると、col_value20210101col_name および dt テーブル列にインポートできます。
SET 列型変換関数。ソースとターゲットの列型が異なる場合に必要です。
WHERE predicate 列型変換後に列をフィルタリングします。フィルタリングされた行はエラー率計算から除外されます。同じテーブルに対する複数の WHERE 述語は AND で結合されます。

broker_properties とジョブレベルのプロパティ

broker_properties 句は、ストレージ認証情報と接続設定を渡します。オプションの PROPERTIES ブロックは、ジョブ実行動作を制御します。

broker_properties:
    (key2=value2, ...)
重要

これらは異なるスコープです。broker_properties (WITH BROKER 内) は、StarRocks がストレージシステムに接続する方法を構成します。PROPERTIES (外側のブロック) は、ジョブの実行方法 (タイムアウト、フィルター率、メモリ、モード) を制御します。

パラメーター 説明
timeout ジョブのタイムアウト (秒)。デフォルト: 14,400 (4 時間)。ジョブがこの期間内に完了しない場合、CANCELLED 状態に移行します。最小タイムアウトは、(総ファイルサイズ (MB) × ソースおよびロールアップテーブル数) / (30 × 同時実行数) で見積もります。たとえば、1 GB のファイルでソーステーブルが 1 つ、ロールアップテーブルが 2 つ、同時実行数が 3 の場合、少なくとも (1 × 1,024 × 3) / (10 × 3) = 102 秒 が必要です。ジョブがデフォルトを超えることが予想される場合にのみ、タイムアウトを設定してください。
max_filter_ratio 許容最大エラー率 (0 から 1)。デフォルト: 0。エラー率がこの値を超えると、ジョブは失敗します。エラーのある行をスキップできるようにするには、0 より大きい値を設定します。比率は max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL) として計算します。
load_mem_limit ジョブのメモリ制限 (バイト)。デフォルト: 0 (制限なし)。
strict_mode true に設定すると、NULL 以外のソース値が NULL に変換される行はエラーとして扱われ、フィルタリングされます。デフォルト: 無効。 PROPERTIES ("strict_mode" = "true") で有効にします。Strict モードは、関数生成列や、有効ではあるがターゲット列の範囲外の値には適用されません。

OSS からのデータロード

次の例では、OSS から TPC-H lineitem ファイルを StarRocks テーブルにロードします。

StarRocks 2.5.8 より前のバージョン

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

StarRocks 2.5.8 以降のバージョン

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
    "fs.oss.accessKeyId" = "xxx",
    "fs.oss.accessKeySecret" = "xxx",
    "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);

HDFS からのデータロード

認証

HDFS は 2 つの認証モードをサポートしています。

シンプル認証 — ユーザー ID は接続クライアントの OS によって決定されます。

パラメーター 説明
hadoop.security.authentication simple に設定します。
username HDFS ユーザー名。
password HDFS パスワード。

Kerberos 認証 — ユーザー ID は Kerberos 認証情報によって決定されます。

パラメーター 説明
hadoop.security.authentication kerberos に設定します。
kerberos_principal Kerberos プリンシパル。
kerberos_keytab Kerberos キータブファイルへのパス。ファイルはブローカープロセスと同じサーバーに存在する必要があります。
kerberos_keytab_content Base64 エンコードされたキータブファイルの内容。このパラメーターまたは kerberos_keytab のいずれかを構成し、両方は構成しないでください。

高可用性構成

NameNode 高可用性 (HA) を持つ HDFS クラスターの場合、StarRocks がアクティブな NameNode フェールオーバーを自動的に検出できるように、次のパラメーターを構成します。

パラメーター 説明
dfs.nameservices HDFS サービスのカスタム名 (例: my-ha)。
dfs.ha.namenodes.xxx コンマ区切りの NameNode 名。xxxdfs.nameservices の値に置き換えます。
dfs.namenode.rpc-address.xxx.nn 各 NameNode のリモートプロシージャコール (RPC) アドレス (Hostname:Port)。nn を各 NameNode 名に置き換えます。
dfs.client.failover.proxy.provider.xxx フェールオーバープロキシプロバイダー。デフォルト: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

シンプル認証を使用する HA HDFS クラスターの broker_properties の例:

(
    "username" = "user",
    "password" = "passwd",
    "dfs.nameservices" = "my-ha",
    "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
    "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
    "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
    "dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

クラスターの HDFS 構成が hdfs-site.xml に保存されている場合は、ファイルパスと認証パラメーターのみを指定します。

HDFS ロード例

LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1, tmp_c2)
    SET
    (
        id = tmp_c2,
        name = tmp_c1
    ),

    DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
    INTO TABLE tbl2
    COLUMNS TERMINATED BY ","
    (col1, col2)
    WHERE col1 > 1
)
WITH BROKER 'broker1'
(
    "username" = "hdfs_username",
    "password" = "hdfs_password"
)
PROPERTIES
(
    "timeout" = "3600"
);

ジョブステータスのクエリ

ブローカーロードは非同期です。ジョブを送信した後、SHOW LOAD を使用して進行状況を追跡します。

SHOW LOAD WHERE label = 'label1'\G

完全な SHOW LOAD 構文を表示するには、HELP SHOW LOAD を実行します。

SHOW LOAD は非同期インポートメソッドでのみ機能します。Stream Load のような同期メソッドには適用されません。

出力は次のようになります。

*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
フィールド 説明
JobId システム生成のジョブの一意の ID。再利用されることはありません。
Label 指定した、または自動生成されたラベル。
State 現在のジョブステータス: PENDING (待機中)、LOADING (実行中)、CANCELLED (失敗)、または FINISHED (成功)。
Progress ブローカーロードには ETL フェーズがないため、ETL は常に N/A と表示されます。LOADING の進行状況は (インポートされたテーブル数 / 総ソーステーブル数) × 100% です。すべてのテーブルがインポートされてジョブが最終処理中の場合、99% に達します。ジョブが完全に完了した後にのみ 100% になります。進行状況は線形ではありません。安定したパーセンテージは、ジョブが停止したことを意味しません。
Type ブローカーロードジョブの場合、常に BROKER です。
EtlInfo 3 つのカウンター: unselected.rows (WHERE でフィルタリングされた行)、dpp.norm.ALL (正常にロードされた行)、dpp.abnorm.ALL (エラーのある行)。これらの合計は、ソースファイルの総行数に等しくなります。
TaskInfo 構成したジョブパラメーター: クラスター、タイムアウト、および max_filter_ratio
ErrorMsg ジョブが CANCELLED 状態の場合のエラー詳細。FINISHED 状態の場合は N/Atype フィールドは、USER-CANCELETL-RUN-FAILETL-QUALITY-UNSATISFIEDLOAD-RUN-FAILTIMEOUT、または UNKNOWN のいずれかです。
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime 各フェーズのタイムスタンプ。ブローカーロードには ETL フェーズがないため、EtlStartTimeEtlFinishTime、および LoadStartTime は同じ値になります。CreateTime のみが投入され、LoadStartTime が長期間 N/A のままである場合、多数のジョブがキューに登録されています。キューがクリアされるまで、新しいジョブの送信を停止してください。
URL エラーのある行のサンプルへのリンク。エラーがない場合は N/A
JobDetails バックエンド、スキャンされた行 (5 秒ごとに更新)、タスク数、ファイル数、ファイルサイズに関する詳細。

インポートジョブのキャンセル

PENDING または LOADING 状態のジョブをキャンセルします。

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];

完全な構文を表示するには、HELP CANCEL LOAD を実行します。

エンドツーエンドの例: OSS からのロード

この例では、OSS から StarRocks に TPC-H lineitem データをロードします。

ステップ 1: ターゲットテーブルを作成します。

CREATE TABLE lineitem (
  l_orderkey      bigint,
  l_partkey       bigint,
  l_suppkey       bigint,
  l_linenumber    int,
  l_quantity      double,
  l_extendedprice double,
  l_discount      double,
  l_tax           double,
  l_returnflag    string,
  l_linestatus    string,
  l_shipdate      date,
  l_commitdate    date,
  l_receiptdate   date,
  l_shipinstruct  string,
  l_shipmode      string,
  l_comment       string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
  "replication_num" = "1"
);

ステップ 2: インポートジョブを送信します。

StarRocks 2.5.8 より前のバージョン

StarRocks 2.5.8 より前のバージョンでは、閉じ括弧の後に WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") を追加します。

StarRocks 2.5.8 以降のバージョン

StarRocks 2.5.8 以降のバージョン:

LOAD LABEL tpch.lineitem
(
    DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
    INTO TABLE `lineitem`
    COLUMNS TERMINATED BY '|'
    (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
     l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
     l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)

ステップ 3: ジョブを監視します。

SHOW LOAD WHERE label = 'lineitem'\G

ジョブが成功すると、出力に State: FINISHEDProgress: ETL:100%; LOAD:100% が表示されます。

ステップ 4: データを検証します。

-- Check total row count
SELECT COUNT(*) FROM lineitem;

-- Preview the first two rows
SELECT * FROM lineitem LIMIT 2;

同時実行性

ブローカーロードジョブはタスクに分割され、さらにタスクはバックエンド全体に並行して分散されるインスタンスに分割されます。

タスク分割: ジョブは、異なるソースアドレスまたは異なるパーティションセットを指す data_desc 句ごとに 1 つのタスクを生成します。

インスタンス分割: 各タスクは、次の 3 つのフロントエンドパラメーターに基づいてインスタンスに分割されます。

パラメーター デフォルト 説明
min_bytes_per_broker_scanner 64 MB インスタンスあたりの最小データ量。
max_broker_concurrency 100 タスクあたりの最大並列インスタンス数。
load_parallel_instance_num 1 バックエンドあたりの並列インスタンス数。

総インスタンス数 = min(ファイルサイズ / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × バックエンド数)

実際には、ほとんどのジョブには単一の data_desc 句があり、したがって単一のタスクがあります。そのタスクは、バックエンドの数と同じ数のインスタンスに分割され、各インスタンスは異なるバックエンドで実行されます。

トラブルシューティング

ジョブが CANCELLED 状態になる

SHOW LOAD 出力の ErrorMsg フィールドを確認してください。次の表は、各エラータイプとその原因、修正方法を示しています。

ErrorMsg タイプ 原因 修正
ETL-QUALITY-UNSATISFIED エラー率が max_filter_ratio を超過しています。 より多くのエラーのある行をスキップできるように、PROPERTIESmax_filter_ratio を増やすか、ソースデータの品質を修正します。
TIMEOUT ジョブがタイムアウト期間を超過しました。 (総ファイルサイズ (MB) × ソースおよびロールアップテーブル数) / (30 × 同時実行数) を使用して最小タイムアウトを再計算し、PROPERTIES ("timeout" = "...") でより大きな値を設定します。
LOAD-RUN-FAIL LOADING フェーズ中にバックエンドが失敗しました。 URL フィールドでエラーのある行のサンプルを確認してください。バックエンドログでハードウェアまたはネットワークの問題を調査します。
ETL-RUN-FAIL ETL フェーズでの失敗。 URL フィールドでエラー詳細を確認してください。
USER-CANCEL ジョブが手動でキャンセルされました。 同じラベルで再送信します (CANCELLED ラベルは再利用できます)。
UNKNOWN 予期しないエラーが発生しました。 詳細については、バックエンドおよびフロントエンドのログを確認してください。

LOADING の進行が停止する

SHOW LOAD に表示される進行状況は線形ではありません。完全なテーブルがインポートされたときにのみ更新されます。安定したパーセンテージは、ジョブが停止したことを意味しません。ジョブがハングしたと結論付ける前に、LoadFinishTime フィールドが投入されるのを待ってください。

ジョブ作成後、LoadStartTime が長期間 N/A のままである場合、多数のジョブがキューに登録されています。キューがクリアされるまで、新しいジョブの送信を停止してください。

次のステップ

  • 高度な LOAD LABEL オプションと HELP BROKER LOAD の使用法については、StarRocks ドキュメントをクエリしてください。

  • SHOW LOAD で進行中のジョブを監視し、パフォーマンスチューニングのためにバックエンドログを確認してください。

  • クラスターのインポート同時実行数を調整するために、フロントエンドで min_bytes_per_broker_scannermax_broker_concurrency、および load_parallel_instance_num を調整してください。