Stream Load を使用して、ローカルファイルまたはデータストリームを StarRocks にインポートできます。このトピックでは、Stream Load を使用したデータインポートのプロセスについて説明します。
背景情報
Stream Load は、HTTP リクエストを送信することでローカルファイルやデータストリームを StarRocks にインポートできる同期インポートメソッドです。HTTP 応答はインポート結果を返し、タスクが成功したかどうかを示します。Stream Load は CSV および JSON ファイル形式をサポートしています。1 回のインポートのデータサイズは 10 GB に制限されています。
インポートタスクの作成
Stream Load は HTTP プロトコルを使用してデータを送信および転送します。このトピックでは、curl コマンドを使用してインポートタスクを送信する方法を説明します。他の HTTP クライアントを使用することもできます。
構文
curl --location-trusted -u <username>:<password> -XPUT <url>
(
data_desc
)
[opt_properties] ExpectHTTP リクエストヘッダーに100-continueを指定することをお勧めします。例:"Expect:100-continue"。この方法により、サーバーがインポートタスクを拒否した場合の不要なデータ送信を防ぎ、リソースのオーバーヘッドを削減できます。StarRocks では、一部の単語は SQL の予約キーワードであり、SQL 文で直接使用することはできません。SQL 文で予約キーワードを使用するには、バックティック (`) で囲む必要があります。予約キーワードの詳細については、「キーワード」をご参照ください。
パラメーター
<username>:<password>: StarRocks クラスターのユーザー名とパスワードです。このパラメーターは必須です。アカウントにパスワードがない場合は、ユーザー名の後にコロンを付けて指定します。例:<username>:。XPUT: HTTP リクエストメソッドです。このパラメーターは必須です。Stream Load の場合は、PUT メソッドを指定します。<url>: Stream Load リクエストの URL です。このパラメーターは必須です。フォーマットはhttp://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_loadです。次の表に、URL のパラメーターを示します。
パラメーター
必須
説明
<fe_host>はい
StarRocks クラスターのフロントエンド (FE) ノードの IP アドレスです。
<fe_http_port>はい
StarRocks クラスターの FE ノードの HTTP ポートです。デフォルト値は 18030 です。
StarRocks の [クラスターサービス] ページの [設定] タブで [http_port] パラメーターを検索してポート番号を表示できます。さらに、
SHOW FRONTENDSコマンドを実行して、FE ノードの IP アドレスと HTTP ポート番号を表示することもできます。<database_name>はい
宛先 StarRocks テーブルが配置されているデータベースの名前です。
<table_name>はい
宛先 StarRocks テーブルの名前です。
desc: ソースデータファイルのプロパティを指定します。これには、ファイル名、フォーマット、列区切り文字、行区切り文字、宛先パーティション、StarRocks テーブルへの列マッピングが含まれます。フォーマットは次のとおりです。-T <file_path> -H "format: CSV | JSON" -H "column_separator: <column_separator>" -H "row_delimiter: <row_delimiter>" -H "columns: <column1_name>[, <column2_name>, ... ]" -H "partitions: <partition1_name>[, <partition2_name>, ...]" -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]" -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]" -H "strip_outer_array: true | false" -H "json_root: <json_path>" -H "ignore_json_size: true | false" -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"data_descのパラメーターは、共通パラメーター、CSV 用パラメーター、JSON 用パラメーターの 3 つのカテゴリに分類されます。共通パラメーター
パラメーター
必須
説明
<file_path>はい
ソースデータファイルが保存されているパスです。ファイル名にはオプションで拡張子を含めることができます。
formatいいえ
インポートするデータのフォーマットです。有効な値は
CSVとJSONです。デフォルト値はCSVです。partitionsいいえ
データをインポートする特定のパーティションです。このパラメーターを指定しない場合、データはデフォルトで StarRocks テーブルのすべてのパーティションにインポートされます。
temporary_partitionsいいえ
データをインポートする一時パーティションです。
columnsいいえ
ソースデータファイルの列と StarRocks テーブルの列との間のマッピングです。
ソースデータファイルの列が StarRocks テーブルの列に順番に対応している場合、このパラメーターを指定する必要はありません。
ソースデータファイルがテーブルスキーマと一致しない場合は、このパラメーターを指定してデータ変換ルールを設定する必要があります。列には 2 つの形式があります。1 つはインポートされたファイルのフィールドに直接対応し、フィールド名で表すことができます。もう 1 つは計算によって導出する必要があります。
例 1: テーブルには
c1, c2, c3の 3 つの列があります。ソースファイルの 3 つの列は、順番にc3,c2,c1に対応します。-H "columns: c3, c2, c1"を指定する必要があります。例 2: テーブルには
c1, c2, c3の 3 つの列があります。ソースファイルの最初の 3 つの列はテーブルの列に対応しますが、余分な列が 1 つあります。-H "columns: c1, c2, c3, temp"を指定する必要があります。最後の列はプレースホルダーとして任意の名前を持つことができます。例 3: テーブルには
year, month, dayの 3 つの列があります。ソースファイルには 2018-06-01 01:02:03 形式の時刻列が 1 つしかありません。-H "columns: col, year = year(col), month=month(col), day=day(col)"を指定してインポートを完了できます。
CSV 用パラメーター
パラメーター
必須
説明
column_separatorいいえ
ソースデータファイルの列区切り文字です。デフォルト値は
\tです。非表示文字の場合は、
\xプレフィックスを追加し、16 進数を使用してデリミタを表す必要があります。たとえば、Hive ファイルのデリミタ\x01の場合は、-H "column_separator:\x01"を指定します。row_delimiterいいえ
ソースデータファイルの行区切り文字です。デフォルト値は
\nです。重要curl コマンドは \n を渡すことができません。改行を手動で \n として指定すると、シェルは改行文字 \n を直接渡すのではなく、バックスラッシュ (\) と n を渡します。
Bash は別のエスケープ文字列構文をサポートしています。
\nと\tを渡すには、文字列をドル記号と一重引用符 ($') で始め、一重引用符 (') で終わらせます。例:-H $'row_delimiter:\n'。skip_headerいいえ
CSV ファイルの先頭でスキップするヘッダー行の数です。値は整数である必要があります。デフォルト値は 0 です。
一部の CSV ファイルでは、最初の数行が列名や型などのメタデータを定義するために使用されます。このパラメーターを設定することで、StarRocks はインポート中に CSV ファイルの最初の数行を無視できます。たとえば、このパラメーターを 1 に設定すると、StarRocks はインポート中に CSV ファイルの最初の行を無視します。
ここで使用される行区切り文字は、インポートコマンドで設定したものと同じでなければなりません。
whereいいえ
データの一部を抽出するために使用されます。不要なデータをフィルターで除外するには、このオプションを設定します。
たとえば、k1 列が 20180601 に等しいデータのみをインポートするには、
-H "where: k1 = 20180601"を指定します。max_filter_ratioいいえ
データ品質などの問題によりフィルターで除外できる行の最大比率です。デフォルト値は 0 で、これはゼロトレランスを意味します。
説明WHERE 句によってフィルターされたデータは含まれません。
partitionsいいえ
このインポートに関係するパーティションです。
データのパーティションを決定できる場合は、このパラメーターを指定することをお勧めします。指定されたパーティションに適合しないデータはフィルターで除外されます。たとえば、パーティション p1 と p2 にデータをインポートするには、
-H "partitions: p1, p2"を指定します。timeoutいいえ
インポートのタイムアウト期間です。デフォルト値は 600 秒です。
値の範囲は 1 から 259200 です。単位は秒です。
strict_modeいいえ
このインポートで厳格モードを有効にするかどうかを指定します。有効な値:
false (デフォルト): 厳格モードは無効です。
true: 厳格モードは有効です。有効にすると、インポートプロセス中に列の型変換に厳格なフィルタリングが適用されます。
timezoneいいえ
このインポートのタイムゾーンです。デフォルトは UTC + 08:00 です。
このパラメーターは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。
exec_mem_limitいいえ
インポートのメモリ制限です。デフォルト値は 2 GB です。
JSON 用パラメーター
パラメーター
必須
説明
jsonpathsいいえ
インポートするフィールドの名前です。このパラメーターは、マッチングモードで JSON 形式のデータをインポートする場合にのみ必須です。パラメーター値は JSON 形式である必要があります。
strip_outer_arrayいいえ
最も外側の配列構造をトリミングするかどうかを指定します。有効な値:
false (デフォルト): 外側の配列を削除せずに、JSON データの元の構造を保持します。JSON 配列全体が単一の値としてインポートされます。
たとえば、サンプルデータ
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}]の場合、strip_outer_arrayが false に設定されていると、データは単一の配列として解析され、テーブルにインポートされます。true: インポートされたデータが JSON 配列の場合、
strip_outer_arrayを true に設定する必要があります。たとえば、サンプルデータ
[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}]の場合、strip_outer_arrayが true に設定されていると、データは 2 つのレコードとして解析され、テーブルにインポートされます。
json_rootいいえ
インポートする JSON データのルート要素です。このパラメーターは、マッチングモードで JSON 形式のデータをインポートする場合にのみ必須です。パラメーター値は有効な JsonPath 文字列である必要があります。デフォルト値は空で、これは JSON データファイル全体がインポートされることを意味します。
ignore_json_sizeいいえ
HTTP リクエストの JSON 本文のサイズをチェックするかどうかを指定します。
説明デフォルトでは、HTTP リクエストの JSON 本文のサイズは 100 MB を超えることはできません。JSON 本文のサイズが 100 MB を超えると、
"The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead enormous memory consuming."エラーが報告されます。このエラーを回避するには、HTTP リクエストヘッダーに"ignore_json_size:true"設定を追加して、JSON 本文サイズのチェックをスキップできます。compression, Content-Encodingいいえ
Stream Load のデータ転送中に使用する圧縮アルゴリズムです。サポートされているアルゴリズムには、GZIP、BZIP2、LZ4_FRAME、Zstandard があります。
たとえば、
curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4です。
opt_properties: インポートのオプションパラメーターです。指定された設定はインポートタスク全体に適用されます。フォーマットは次のとおりです。
-H "label: <label_name>" -H "where: <condition1>[, <condition2>, ...]" -H "max_filter_ratio: <num>" -H "timeout: <num>" -H "strict_mode: true | false" -H "timezone: <string>" -H "load_mem_limit: <num>" -H "partial_update: true | false" -H "partial_update_mode: row | column" -H "merge_condition: <column_name>"次の表に、パラメーターを示します。
パラメーター
必須
説明
labelいいえ
インポートタスクのラベルです。同じラベルのデータは複数回インポートできません。
ラベルを指定して、重複したデータインポートを防ぐことができます。StarRocks は、正常に完了したタスクのラベルを過去 30 分間保持します。
whereいいえ
フィルター条件です。このパラメーターを指定すると、StarRocks は指定された条件に基づいて変換されたデータをフィルターします。
where句で定義されたフィルター条件を満たすデータのみがインポートされます。たとえば、k1 列が 20180601 に等しいデータのみをインポートするには、
-H "where: k1 = 20180601"を指定します。max_filter_ratioいいえ
データ品質などの問題によりフィルターで除外できる行の最大比率です。デフォルト値は 0 で、これはゼロトレランスを意味します。
説明where句によってフィルターされたデータは含まれません。log_rejected_record_numいいえ
データ品質の低さが原因でフィルターで除外できる行の最大数です。このパラメーターはバージョン 3.1 からサポートされています。有効な値:
0、-1、または正の整数。デフォルト値は0です。0: フィルターされた行を記録しません。-1: フィルターされたすべての行を記録します。正の整数 (例:
n) は、各 BE (または CN) ノードで最大n個のフィルターされた行を記録します。
timeoutいいえ
インポートのタイムアウト期間です。デフォルト値は 600 秒です。
値の範囲は 1 から 259200 です。単位は秒です。
strict_modeいいえ
このインポートで厳格モードを有効にするかどうかを指定します。
false (デフォルト): 厳格モードは無効です。
true: 厳格モードは有効です。
timezoneいいえ
このインポートのタイムゾーンです。デフォルトは UTC + 08:00 です。
このパラメーターは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。
load_mem_limitいいえ
インポートタスクのメモリ制限です。デフォルト値は 2 GB です。
partial_updateいいえ
部分的な列更新を使用するかどうかを指定します。有効な値は
TRUEとFALSEです。デフォルト値はFALSEです。partial_update_modeいいえ
部分更新のモードです。有効な値は
rowとcolumnです。row(デフォルト): 部分更新に行モードを指定します。このモードは、小さなバッチで多くの列をリアルタイムに更新するのに適しています。column: 部分更新に列モードを指定します。このモードは、多くの行にわたる少数の列の一括更新に適しています。このシナリオでは、列モードを有効にすると更新が高速になります。たとえば、100 列のテーブルで、すべての行に対して 10 列 (全体の 10%) を更新する場合、列モードを有効にすると更新パフォーマンスが 10 倍向上します。
merge_conditionいいえ
更新が有効になる条件として使用される列の名前です。更新は、インポートされたデータのこの列の値が現在の値以上である場合にのみ有効になります。
説明指定された列はプライマリキー以外の列でなければなりません。条件付き更新はプライマリキーテーブルでのみサポートされます。
例
この例では、StarRocks クラスターの `load_test` データベースの `example_table` テーブルに data.csv ファイルをインポートする方法を示します。完全な例については、「データインポートの完全な例」をご参照ください。
curl --location-trusted -u "root:" \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator: ," \
-T data.csv -XPUT \
http://172.17.**.**:18030/api/load_test/example_table/_stream_load戻り値
インポートが完了すると、インポートタスクの結果が JSON 形式で返されます。以下は応答のサンプルです。
{
"TxnId": 9,
"Label": "label2",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 4,
"NumberLoadedRows": 4,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 45,
"LoadTimeMs": 235,
"BeginTxnTimeMs": 101,
"StreamLoadPlanTimeMs": 102,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 11,
"CommitAndPublishTimeMs": 19
}次の表に、応答のパラメーターを示します。
パラメーター | 説明 |
| インポートのトランザクション ID です。 |
| インポートのラベルです。 |
| インポートのステータスです。有効な値:
|
| 既存のラベルに対応するインポートタスクのステータスです。このフィールドは、
|
| インポートタスクのステータスに関する詳細です。インポートが失敗した場合、失敗の具体的な理由が返されます。 |
| データストリームから読み取られた行の総数です。 |
| インポートされた行数です。これは、インポートステータスが Success の場合にのみ有効です。 |
| フィルターで除外された行数で、データ品質が不適格な行です。 |
| Where 条件によってフィルターで除外された行数です。 |
| インポートタスクのソースファイルのサイズです。 |
| インポートタスクにかかった時間 (ミリ秒) です。 |
| インポートタスクのトランザクションを開始するのにかかった時間です。 |
| インポートタスクの実行計画を生成するのにかかった時間です。 |
| インポートタスクのデータを読み取るのにかかった時間です。 |
| インポートタスクのデータを書き込むのにかかった時間です。 |
| このパラメーターは、タスクがデータのインポートに失敗した場合に返されます。
たとえば、エラー行情報をエクスポートするには: エクスポートされたエラー行情報は、 エラーメッセージに基づいてインポートタスクを調整し、タスクを再送信できます。 |
インポートタスクのキャンセル
Stream Load タスクを手動でキャンセルすることはできません。タスクは、タイムアウトした場合やエラーが発生した場合に自動的にキャンセルされます。戻り値の ErrorURL を使用して、トラブルシューティングのためにエラーメッセージをダウンロードできます。
データインポートの完全な例
この例では、curl コマンドを使用してインポートタスクを作成します。
インポートするデータ用のテーブルを作成します。
Secure Shell (SSH) を使用して StarRocks クラスターのマスターノードにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
次のコマンドを実行して、MySQL クライアントを使用して StarRocks クラスターに接続します。
mysql -h127.0.0.1 -P 9030 -uroot次のコマンドを実行して、データベースとテーブルを作成します。
CREATE DATABASE IF NOT EXISTS load_test; USE load_test; CREATE TABLE IF NOT EXISTS example_table ( id INT, name VARCHAR(50), age INT ) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( "replication_num" = "1" -- レプリカ数を 1 に設定します。 );コマンドを実行した後、
Ctrl+Dを押して MySQL クライアントを終了します。
テストデータを準備します。
CSV データの準備
たとえば、次の内容で
data.csvという名前のファイルを作成します。id,name,age 1,Alice,25 2,Bob,30 3,Charlie,35JSON データの準備
たとえば、次の内容で
json.dataという名前のファイルを作成します。{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}次のコマンドを実行して、インポートタスクを作成します。
CSV データのインポート
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label1" \ -H "column_separator: ," \ -T data.csv -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_loadJSON データのインポート
curl --location-trusted -u "root:" \ -H "Expect:100-continue" \ -H "label:label2" \ -H "format:json" \ -T json.data -XPUT \ http://172.17.**.**:18030/api/load_test/example_table/_stream_load
コード統合の例
Java 開発の例については、「stream_load」をご参照ください。
Stream Load と Spark を統合する方法の例については、「01_sparkStreaming2StarRocks」をご参照ください。