ブローカーロードは、StarRocks の非同期インポートメソッドです。ブローカープロセスを使用して外部ストレージシステムからデータを読み取り、StarRocks 独自の計算リソースを使用してデータを前処理し、ロードします。ブローカーロードを使用すると、Hadoop 分散ファイルシステム (HDFS) または Alibaba Cloud Object Storage Service (OSS) から、ジョブあたり数十から数百 GB のデータをインポートできます。単一のブローカーロードジョブは、複数のターゲットテーブルをアトミックに処理できます。つまり、すべてのテーブルが成功するか、すべて失敗するかのいずれかです。
ブローカーロードの仕組み
ブローカーロードには、ご利用の StarRocks バージョンに応じて 2 つの動作モードがあります。
-
ブローカーベースのロード (StarRocks 2.5.8 より前のバージョン): StarRocks は、ブローカープロセスに依存して外部ストレージに接続します。ロードステートメントで
WITH BROKER "<broker_name>"を使用して、ブローカー名を明示的に指定します。 -
ブローカーフリーのロード (StarRocks 2.5.8 以降): StarRocks は外部ストレージに直接接続します。ブローカー名は省略しますが、ロードステートメントの
WITH BROKERキーワードは保持します。
E-MapReduce (EMR) で StarRocks クラスターを作成すると、ブローカープロセスはすべてのコアノードに自動的にデプロイされ、起動します。
サポートされているソースとフォーマット
| ディメンション | サポートされている値 |
|---|---|
| データソース | HDFS、OSS |
| ファイル形式 | CSV (デフォルト)、ORC、Parquet (.parquet または .parq) |
前提条件
開始する前に、以下が揃っていることを確認してください。
-
少なくとも 1 つのコアノードを持つ EMR で実行中の StarRocks クラスター
-
HDFS または OSS のアクセス認証情報
-
StarRocks のターゲットデータベースとテーブル
ブローカーのクエリ
クラスターでブローカーが実行されていることを確認するには、以下を実行します。
SHOW PROC "/brokers"\G
出力には、各ブローカーの IP アドレス、ポート (デフォルト: 8000)、稼働状況、タイムスタンプが一覧表示されます。
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
インポートジョブの作成
すべてのインポートジョブは LOAD LABEL ステートメントを使用します。構文は StarRocks のバージョンによって若干異なります。
StarRocks 2.5.8 より前のバージョン
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
StarRocks 2.5.8 以降のバージョン
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_properties
[PROPERTIES (key1=value1, ... )]
EMR StarRocks クラスターでは、ブローカー名として broker を使用します。
完全な構文リファレンスを表示するには、HELP BROKER LOAD を実行します。
ラベル
各インポートジョブには一意のラベルが必要です。カスタムラベルを定義するか、システムに生成させることができます。ラベルには 2 つの目的があります。
-
SHOW LOAD WHERE label = '<label>'でジョブステータスを追跡する -
重複インポートを防止する — FINISHED 状態のジョブがすでに同じラベルを持っている場合、StarRocks は新しいジョブを拒否します
ジョブが FINISHED 状態に達すると、ラベルは有効期限切れになります。ジョブが CANCELLED 状態になると、同じラベルを再利用してジョブを再送信できます。
data_desc パラメーター
data_desc 句は、1 つのターゲットテーブルのソースデータを記述します。単一のジョブには、異なるテーブルをターゲットとする複数の data_desc 句を含めることができます。StarRocks は、1 つのジョブ内のすべてのテーブルにわたる原子性を保証します。
data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY column_separator]
[FORMAT AS file_type]
[(col1, ...)]
[COLUMNS FROM PATH AS (colx, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
| パラメーター | 説明 |
|---|---|
file_path |
特定のファイルパス、またはワイルドカード (?、*、[]、{}、^) を使用したグロブパターン。たとえば、hdfs://hdfs_host:hdfs_port/user/data/tablename/*/ は、/tablename の下のすべてのパーティション内のすべてのファイルをインポートします。ワイルドカード構文については、FileSystem.globStatus をご参照ください。 |
NEGATIVE |
ソース行を削除としてマークします。これを使用して、SUM 型の集計列へのバッチインポートを取り消します。 |
PARTITION |
インポートをターゲットテーブルの指定されたパーティションに制限します。リストされたパーティションに属さない行はエラーとしてカウントされます。代わりに WHERE 述語を使用してそれらをフィルタリングします。 |
column_separator |
ソースファイルの列区切り文字。デフォルト: \t。非表示文字には、16 進数形式 (例: Hive のデフォルト \x01 の場合は \\x01) を使用します。 |
file_type |
ソースファイル形式: csv (デフォルト)、orc、または parquet。 |
COLUMNS FROM PATH AS |
ファイルパスからパーティションフィールド値を抽出します。たとえば、パス /path/col_name=col_value/dt=20210101/file1 を使用すると、col_value と 20210101 を col_name および dt テーブル列にインポートできます。 |
SET |
列型変換関数。ソースとターゲットの列型が異なる場合に必要です。 |
WHERE predicate |
列型変換後に列をフィルタリングします。フィルタリングされた行はエラー率計算から除外されます。同じテーブルに対する複数の WHERE 述語は AND で結合されます。 |
broker_properties とジョブレベルのプロパティ
broker_properties 句は、ストレージ認証情報と接続設定を渡します。オプションの PROPERTIES ブロックは、ジョブ実行動作を制御します。
broker_properties:
(key2=value2, ...)
これらは異なるスコープです。broker_properties (WITH BROKER 内) は、StarRocks がストレージシステムに接続する方法を構成します。PROPERTIES (外側のブロック) は、ジョブの実行方法 (タイムアウト、フィルター率、メモリ、モード) を制御します。
| パラメーター | 説明 |
|---|---|
timeout |
ジョブのタイムアウト (秒)。デフォルト: 14,400 (4 時間)。ジョブがこの期間内に完了しない場合、CANCELLED 状態に移行します。最小タイムアウトは、(総ファイルサイズ (MB) × ソースおよびロールアップテーブル数) / (30 × 同時実行数) で見積もります。たとえば、1 GB のファイルでソーステーブルが 1 つ、ロールアップテーブルが 2 つ、同時実行数が 3 の場合、少なくとも (1 × 1,024 × 3) / (10 × 3) = 102 秒 が必要です。ジョブがデフォルトを超えることが予想される場合にのみ、タイムアウトを設定してください。 |
max_filter_ratio |
許容最大エラー率 (0 から 1)。デフォルト: 0。エラー率がこの値を超えると、ジョブは失敗します。エラーのある行をスキップできるようにするには、0 より大きい値を設定します。比率は max_filter_ratio = dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL) として計算します。 |
load_mem_limit |
ジョブのメモリ制限 (バイト)。デフォルト: 0 (制限なし)。 |
strict_mode |
true に設定すると、NULL 以外のソース値が NULL に変換される行はエラーとして扱われ、フィルタリングされます。デフォルト: 無効。 PROPERTIES ("strict_mode" = "true") で有効にします。Strict モードは、関数生成列や、有効ではあるがターゲット列の範囲外の値には適用されません。 |
OSS からのデータロード
次の例では、OSS から TPC-H lineitem ファイルを StarRocks テーブルにロードします。
StarRocks 2.5.8 より前のバージョン
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
StarRocks 2.5.8 以降のバージョン
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
WITH BROKER broker
(
"fs.oss.accessKeyId" = "xxx",
"fs.oss.accessKeySecret" = "xxx",
"fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
);
HDFS からのデータロード
認証
HDFS は 2 つの認証モードをサポートしています。
シンプル認証 — ユーザー ID は接続クライアントの OS によって決定されます。
| パラメーター | 説明 |
|---|---|
hadoop.security.authentication |
simple に設定します。 |
username |
HDFS ユーザー名。 |
password |
HDFS パスワード。 |
Kerberos 認証 — ユーザー ID は Kerberos 認証情報によって決定されます。
| パラメーター | 説明 |
|---|---|
hadoop.security.authentication |
kerberos に設定します。 |
kerberos_principal |
Kerberos プリンシパル。 |
kerberos_keytab |
Kerberos キータブファイルへのパス。ファイルはブローカープロセスと同じサーバーに存在する必要があります。 |
kerberos_keytab_content |
Base64 エンコードされたキータブファイルの内容。このパラメーターまたは kerberos_keytab のいずれかを構成し、両方は構成しないでください。 |
高可用性構成
NameNode 高可用性 (HA) を持つ HDFS クラスターの場合、StarRocks がアクティブな NameNode フェールオーバーを自動的に検出できるように、次のパラメーターを構成します。
| パラメーター | 説明 |
|---|---|
dfs.nameservices |
HDFS サービスのカスタム名 (例: my-ha)。 |
dfs.ha.namenodes.xxx |
コンマ区切りの NameNode 名。xxx を dfs.nameservices の値に置き換えます。 |
dfs.namenode.rpc-address.xxx.nn |
各 NameNode のリモートプロシージャコール (RPC) アドレス (Hostname:Port)。nn を各 NameNode 名に置き換えます。 |
dfs.client.failover.proxy.provider.xxx |
フェールオーバープロキシプロバイダー。デフォルト: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。 |
シンプル認証を使用する HA HDFS クラスターの broker_properties の例:
(
"username" = "user",
"password" = "passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider.my-ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
クラスターの HDFS 構成が hdfs-site.xml に保存されている場合は、ファイルパスと認証パラメーターのみを指定します。
HDFS ロード例
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1, tmp_c2)
SET
(
id = tmp_c2,
name = tmp_c1
),
DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
WHERE col1 > 1
)
WITH BROKER 'broker1'
(
"username" = "hdfs_username",
"password" = "hdfs_password"
)
PROPERTIES
(
"timeout" = "3600"
);
ジョブステータスのクエリ
ブローカーロードは非同期です。ジョブを送信した後、SHOW LOAD を使用して進行状況を追跡します。
SHOW LOAD WHERE label = 'label1'\G
完全な SHOW LOAD 構文を表示するには、HELP SHOW LOAD を実行します。
SHOW LOAD は非同期インポートメソッドでのみ機能します。Stream Load のような同期メソッドには適用されません。
出力は次のようになります。
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_...
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}
| フィールド | 説明 |
|---|---|
JobId |
システム生成のジョブの一意の ID。再利用されることはありません。 |
Label |
指定した、または自動生成されたラベル。 |
State |
現在のジョブステータス: PENDING (待機中)、LOADING (実行中)、CANCELLED (失敗)、または FINISHED (成功)。 |
Progress |
ブローカーロードには ETL フェーズがないため、ETL は常に N/A と表示されます。LOADING の進行状況は (インポートされたテーブル数 / 総ソーステーブル数) × 100% です。すべてのテーブルがインポートされてジョブが最終処理中の場合、99% に達します。ジョブが完全に完了した後にのみ 100% になります。進行状況は線形ではありません。安定したパーセンテージは、ジョブが停止したことを意味しません。 |
Type |
ブローカーロードジョブの場合、常に BROKER です。 |
EtlInfo |
3 つのカウンター: unselected.rows (WHERE でフィルタリングされた行)、dpp.norm.ALL (正常にロードされた行)、dpp.abnorm.ALL (エラーのある行)。これらの合計は、ソースファイルの総行数に等しくなります。 |
TaskInfo |
構成したジョブパラメーター: クラスター、タイムアウト、および max_filter_ratio。 |
ErrorMsg |
ジョブが CANCELLED 状態の場合のエラー詳細。FINISHED 状態の場合は N/A。type フィールドは、USER-CANCEL、ETL-RUN-FAIL、ETL-QUALITY-UNSATISFIED、LOAD-RUN-FAIL、TIMEOUT、または UNKNOWN のいずれかです。 |
CreateTime / EtlStartTime / EtlFinishTime / LoadStartTime / LoadFinishTime |
各フェーズのタイムスタンプ。ブローカーロードには ETL フェーズがないため、EtlStartTime、EtlFinishTime、および LoadStartTime は同じ値になります。CreateTime のみが投入され、LoadStartTime が長期間 N/A のままである場合、多数のジョブがキューに登録されています。キューがクリアされるまで、新しいジョブの送信を停止してください。 |
URL |
エラーのある行のサンプルへのリンク。エラーがない場合は N/A。 |
JobDetails |
バックエンド、スキャンされた行 (5 秒ごとに更新)、タスク数、ファイル数、ファイルサイズに関する詳細。 |
インポートジョブのキャンセル
PENDING または LOADING 状態のジョブをキャンセルします。
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL LIKE "label_pattern"];
完全な構文を表示するには、HELP CANCEL LOAD を実行します。
エンドツーエンドの例: OSS からのロード
この例では、OSS から StarRocks に TPC-H lineitem データをロードします。
ステップ 1: ターゲットテーブルを作成します。
CREATE TABLE lineitem (
l_orderkey bigint,
l_partkey bigint,
l_suppkey bigint,
l_linenumber int,
l_quantity double,
l_extendedprice double,
l_discount double,
l_tax double,
l_returnflag string,
l_linestatus string,
l_shipdate date,
l_commitdate date,
l_receiptdate date,
l_shipinstruct string,
l_shipmode string,
l_comment string
)
ENGINE = OLAP
DUPLICATE KEY(l_orderkey)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
PROPERTIES (
"replication_num" = "1"
);
ステップ 2: インポートジョブを送信します。
StarRocks 2.5.8 より前のバージョン
StarRocks 2.5.8 より前のバージョンでは、閉じ括弧の後に WITH BROKER broker ("fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx") を追加します。
StarRocks 2.5.8 以降のバージョン
StarRocks 2.5.8 以降のバージョン:
LOAD LABEL tpch.lineitem
(
DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
INTO TABLE `lineitem`
COLUMNS TERMINATED BY '|'
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate,
l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
)
ステップ 3: ジョブを監視します。
SHOW LOAD WHERE label = 'lineitem'\G
ジョブが成功すると、出力に State: FINISHED と Progress: ETL:100%; LOAD:100% が表示されます。
ステップ 4: データを検証します。
-- Check total row count
SELECT COUNT(*) FROM lineitem;
-- Preview the first two rows
SELECT * FROM lineitem LIMIT 2;
同時実行性
ブローカーロードジョブはタスクに分割され、さらにタスクはバックエンド全体に並行して分散されるインスタンスに分割されます。
タスク分割: ジョブは、異なるソースアドレスまたは異なるパーティションセットを指す data_desc 句ごとに 1 つのタスクを生成します。
インスタンス分割: 各タスクは、次の 3 つのフロントエンドパラメーターに基づいてインスタンスに分割されます。
| パラメーター | デフォルト | 説明 |
|---|---|---|
min_bytes_per_broker_scanner |
64 MB | インスタンスあたりの最小データ量。 |
max_broker_concurrency |
100 | タスクあたりの最大並列インスタンス数。 |
load_parallel_instance_num |
1 | バックエンドあたりの並列インスタンス数。 |
総インスタンス数 = min(ファイルサイズ / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num × バックエンド数)
実際には、ほとんどのジョブには単一の data_desc 句があり、したがって単一のタスクがあります。そのタスクは、バックエンドの数と同じ数のインスタンスに分割され、各インスタンスは異なるバックエンドで実行されます。
トラブルシューティング
ジョブが CANCELLED 状態になる
SHOW LOAD 出力の ErrorMsg フィールドを確認してください。次の表は、各エラータイプとその原因、修正方法を示しています。
| ErrorMsg タイプ | 原因 | 修正 |
|---|---|---|
ETL-QUALITY-UNSATISFIED |
エラー率が max_filter_ratio を超過しています。 |
より多くのエラーのある行をスキップできるように、PROPERTIES の max_filter_ratio を増やすか、ソースデータの品質を修正します。 |
TIMEOUT |
ジョブがタイムアウト期間を超過しました。 | (総ファイルサイズ (MB) × ソースおよびロールアップテーブル数) / (30 × 同時実行数) を使用して最小タイムアウトを再計算し、PROPERTIES ("timeout" = "...") でより大きな値を設定します。 |
LOAD-RUN-FAIL |
LOADING フェーズ中にバックエンドが失敗しました。 | URL フィールドでエラーのある行のサンプルを確認してください。バックエンドログでハードウェアまたはネットワークの問題を調査します。 |
ETL-RUN-FAIL |
ETL フェーズでの失敗。 | URL フィールドでエラー詳細を確認してください。 |
USER-CANCEL |
ジョブが手動でキャンセルされました。 | 同じラベルで再送信します (CANCELLED ラベルは再利用できます)。 |
UNKNOWN |
予期しないエラーが発生しました。 | 詳細については、バックエンドおよびフロントエンドのログを確認してください。 |
LOADING の進行が停止する
SHOW LOAD に表示される進行状況は線形ではありません。完全なテーブルがインポートされたときにのみ更新されます。安定したパーセンテージは、ジョブが停止したことを意味しません。ジョブがハングしたと結論付ける前に、LoadFinishTime フィールドが投入されるのを待ってください。
ジョブ作成後、LoadStartTime が長期間 N/A のままである場合、多数のジョブがキューに登録されています。キューがクリアされるまで、新しいジョブの送信を停止してください。
次のステップ
-
高度な
LOAD LABELオプションとHELP BROKER LOADの使用法については、StarRocks ドキュメントをクエリしてください。 -
SHOW LOADで進行中のジョブを監視し、パフォーマンスチューニングのためにバックエンドログを確認してください。 -
クラスターのインポート同時実行数を調整するために、フロントエンドで
min_bytes_per_broker_scanner、max_broker_concurrency、およびload_parallel_instance_numを調整してください。