すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Stream Load

最終更新日:Nov 09, 2025

Stream Load を使用して、ローカルファイルまたはデータストリームを StarRocks にインポートできます。このトピックでは、Stream Load を使用したデータインポートのプロセスについて説明します。

背景情報

Stream Load は、HTTP リクエストを送信することでローカルファイルやデータストリームを StarRocks にインポートできる同期インポートメソッドです。HTTP 応答はインポート結果を返し、タスクが成功したかどうかを示します。Stream Load は CSV および JSON ファイル形式をサポートしています。1 回のインポートのデータサイズは 10 GB に制限されています。

インポートタスクの作成

Stream Load は HTTP プロトコルを使用してデータを送信および転送します。このトピックでは、curl コマンドを使用してインポートタスクを送信する方法を説明します。他の HTTP クライアントを使用することもできます。

構文

curl --location-trusted -u <username>:<password> -XPUT <url>
(
    data_desc
)
[opt_properties]        
説明
  • Expect HTTP リクエストヘッダーに 100-continue を指定することをお勧めします。例: "Expect:100-continue"。この方法により、サーバーがインポートタスクを拒否した場合の不要なデータ送信を防ぎ、リソースのオーバーヘッドを削減できます。

  • StarRocks では、一部の単語は SQL の予約キーワードであり、SQL 文で直接使用することはできません。SQL 文で予約キーワードを使用するには、バックティック (`) で囲む必要があります。予約キーワードの詳細については、「キーワード」をご参照ください。

パラメーター

  • <username>:<password>: StarRocks クラスターのユーザー名とパスワードです。このパラメーターは必須です。アカウントにパスワードがない場合は、ユーザー名の後にコロンを付けて指定します。例: <username>:

  • XPUT: HTTP リクエストメソッドです。このパラメーターは必須です。Stream Load の場合は、PUT メソッドを指定します。

  • <url>: Stream Load リクエストの URL です。このパラメーターは必須です。フォーマットは http://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load です。

    次の表に、URL のパラメーターを示します。

    パラメーター

    必須

    説明

    <fe_host>

    はい

    StarRocks クラスターのフロントエンド (FE) ノードの IP アドレスです。

    <fe_http_port>

    はい

    StarRocks クラスターの FE ノードの HTTP ポートです。デフォルト値は 18030 です。

    StarRocks の [クラスターサービス] ページの [設定] タブで [http_port] パラメーターを検索してポート番号を表示できます。さらに、SHOW FRONTENDS コマンドを実行して、FE ノードの IP アドレスと HTTP ポート番号を表示することもできます。

    <database_name>

    はい

    宛先 StarRocks テーブルが配置されているデータベースの名前です。

    <table_name>

    はい

    宛先 StarRocks テーブルの名前です。

  • desc: ソースデータファイルのプロパティを指定します。これには、ファイル名、フォーマット、列区切り文字、行区切り文字、宛先パーティション、StarRocks テーブルへの列マッピングが含まれます。フォーマットは次のとおりです。

    -T <file_path>
    -H "format: CSV | JSON"
    -H "column_separator: <column_separator>"
    -H "row_delimiter: <row_delimiter>"
    -H "columns: <column1_name>[, <column2_name>, ... ]"
    -H "partitions: <partition1_name>[, <partition2_name>, ...]"
    -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]"
    -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]"
    -H "strip_outer_array: true | false"
    -H "json_root: <json_path>"
    -H "ignore_json_size: true | false"
    -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"

    data_desc のパラメーターは、共通パラメーター、CSV 用パラメーター、JSON 用パラメーターの 3 つのカテゴリに分類されます。

    • 共通パラメーター

      パラメーター

      必須

      説明

      <file_path>

      はい

      ソースデータファイルが保存されているパスです。ファイル名にはオプションで拡張子を含めることができます。

      format

      いいえ

      インポートするデータのフォーマットです。有効な値は CSVJSON です。デフォルト値は CSV です。

      partitions

      いいえ

      データをインポートする特定のパーティションです。このパラメーターを指定しない場合、データはデフォルトで StarRocks テーブルのすべてのパーティションにインポートされます。

      temporary_partitions

      いいえ

      データをインポートする一時パーティションです。

      columns

      いいえ

      ソースデータファイルの列と StarRocks テーブルの列との間のマッピングです。

      • ソースデータファイルの列が StarRocks テーブルの列に順番に対応している場合、このパラメーターを指定する必要はありません。

      • ソースデータファイルがテーブルスキーマと一致しない場合は、このパラメーターを指定してデータ変換ルールを設定する必要があります。列には 2 つの形式があります。1 つはインポートされたファイルのフィールドに直接対応し、フィールド名で表すことができます。もう 1 つは計算によって導出する必要があります。

        • 例 1: テーブルには c1, c2, c3 の 3 つの列があります。ソースファイルの 3 つの列は、順番に c3,c2,c1 に対応します。-H "columns: c3, c2, c1" を指定する必要があります。

        • 例 2: テーブルには c1, c2, c3 の 3 つの列があります。ソースファイルの最初の 3 つの列はテーブルの列に対応しますが、余分な列が 1 つあります。-H "columns: c1, c2, c3, temp" を指定する必要があります。最後の列はプレースホルダーとして任意の名前を持つことができます。

        • 例 3: テーブルには year, month, day の 3 つの列があります。ソースファイルには 2018-06-01 01:02:03 形式の時刻列が 1 つしかありません。 -H "columns: col, year = year(col), month=month(col), day=day(col)" を指定してインポートを完了できます。

    • CSV 用パラメーター

      パラメーター

      必須

      説明

      column_separator

      いいえ

      ソースデータファイルの列区切り文字です。デフォルト値は \t です。

      非表示文字の場合は、\x プレフィックスを追加し、16 進数を使用してデリミタを表す必要があります。たとえば、Hive ファイルのデリミタ \x01 の場合は、-H "column_separator:\x01" を指定します。

      row_delimiter

      いいえ

      ソースデータファイルの行区切り文字です。デフォルト値は \n です。

      重要

      curl コマンドは \n を渡すことができません。改行を手動で \n として指定すると、シェルは改行文字 \n を直接渡すのではなく、バックスラッシュ (\) と n を渡します。

      Bash は別のエスケープ文字列構文をサポートしています。\n\t を渡すには、文字列をドル記号と一重引用符 ($') で始め、一重引用符 (') で終わらせます。例: -H $'row_delimiter:\n'

      skip_header

      いいえ

      CSV ファイルの先頭でスキップするヘッダー行の数です。値は整数である必要があります。デフォルト値は 0 です。

      一部の CSV ファイルでは、最初の数行が列名や型などのメタデータを定義するために使用されます。このパラメーターを設定することで、StarRocks はインポート中に CSV ファイルの最初の数行を無視できます。たとえば、このパラメーターを 1 に設定すると、StarRocks はインポート中に CSV ファイルの最初の行を無視します。

      ここで使用される行区切り文字は、インポートコマンドで設定したものと同じでなければなりません。

      where

      いいえ

      データの一部を抽出するために使用されます。不要なデータをフィルターで除外するには、このオプションを設定します。

      たとえば、k1 列が 20180601 に等しいデータのみをインポートするには、-H "where: k1 = 20180601" を指定します。

      max_filter_ratio

      いいえ

      データ品質などの問題によりフィルターで除外できる行の最大比率です。デフォルト値は 0 で、これはゼロトレランスを意味します。

      説明

      WHERE 句によってフィルターされたデータは含まれません。

      partitions

      いいえ

      このインポートに関係するパーティションです。

      データのパーティションを決定できる場合は、このパラメーターを指定することをお勧めします。指定されたパーティションに適合しないデータはフィルターで除外されます。たとえば、パーティション p1 と p2 にデータをインポートするには、-H "partitions: p1, p2" を指定します。

      timeout

      いいえ

      インポートのタイムアウト期間です。デフォルト値は 600 秒です。

      値の範囲は 1 から 259200 です。単位は秒です。

      strict_mode

      いいえ

      このインポートで厳格モードを有効にするかどうかを指定します。有効な値:

      • false (デフォルト): 厳格モードは無効です。

      • true: 厳格モードは有効です。有効にすると、インポートプロセス中に列の型変換に厳格なフィルタリングが適用されます。

      timezone

      いいえ

      このインポートのタイムゾーンです。デフォルトは UTC + 08:00 です。

      このパラメーターは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。

      exec_mem_limit

      いいえ

      インポートのメモリ制限です。デフォルト値は 2 GB です。

    • JSON 用パラメーター

      パラメーター

      必須

      説明

      jsonpaths

      いいえ

      インポートするフィールドの名前です。このパラメーターは、マッチングモードで JSON 形式のデータをインポートする場合にのみ必須です。パラメーター値は JSON 形式である必要があります。

      strip_outer_array

      いいえ

      最も外側の配列構造をトリミングするかどうかを指定します。有効な値:

      • false (デフォルト): 外側の配列を削除せずに、JSON データの元の構造を保持します。JSON 配列全体が単一の値としてインポートされます。

        たとえば、サンプルデータ [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}] の場合、strip_outer_array が false に設定されていると、データは単一の配列として解析され、テーブルにインポートされます。

      • true: インポートされたデータが JSON 配列の場合、strip_outer_array を true に設定する必要があります。

        たとえば、サンプルデータ [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}] の場合、strip_outer_array が true に設定されていると、データは 2 つのレコードとして解析され、テーブルにインポートされます。

      json_root

      いいえ

      インポートする JSON データのルート要素です。このパラメーターは、マッチングモードで JSON 形式のデータをインポートする場合にのみ必須です。パラメーター値は有効な JsonPath 文字列である必要があります。デフォルト値は空で、これは JSON データファイル全体がインポートされることを意味します。

      ignore_json_size

      いいえ

      HTTP リクエストの JSON 本文のサイズをチェックするかどうかを指定します。

      説明

      デフォルトでは、HTTP リクエストの JSON 本文のサイズは 100 MB を超えることはできません。JSON 本文のサイズが 100 MB を超えると、"The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead enormous memory consuming." エラーが報告されます。このエラーを回避するには、HTTP リクエストヘッダーに "ignore_json_size:true" 設定を追加して、JSON 本文サイズのチェックをスキップできます。

      compression, Content-Encoding

      いいえ

      Stream Load のデータ転送中に使用する圧縮アルゴリズムです。サポートされているアルゴリズムには、GZIP、BZIP2、LZ4_FRAME、Zstandard があります。

      たとえば、curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4 です。

  • opt_properties: インポートのオプションパラメーターです。指定された設定はインポートタスク全体に適用されます。

    フォーマットは次のとおりです。

    -H "label: <label_name>"
    -H "where: <condition1>[, <condition2>, ...]"
    -H "max_filter_ratio: <num>"
    -H "timeout: <num>"
    -H "strict_mode: true | false"
    -H "timezone: <string>"
    -H "load_mem_limit: <num>"
    -H "partial_update: true | false"
    -H "partial_update_mode: row | column"
    -H "merge_condition: <column_name>"

    次の表に、パラメーターを示します。

    パラメーター

    必須

    説明

    label

    いいえ

    インポートタスクのラベルです。同じラベルのデータは複数回インポートできません。

    ラベルを指定して、重複したデータインポートを防ぐことができます。StarRocks は、正常に完了したタスクのラベルを過去 30 分間保持します。

    where

    いいえ

    フィルター条件です。このパラメーターを指定すると、StarRocks は指定された条件に基づいて変換されたデータをフィルターします。where 句で定義されたフィルター条件を満たすデータのみがインポートされます。

    たとえば、k1 列が 20180601 に等しいデータのみをインポートするには、-H "where: k1 = 20180601" を指定します。

    max_filter_ratio

    いいえ

    データ品質などの問題によりフィルターで除外できる行の最大比率です。デフォルト値は 0 で、これはゼロトレランスを意味します。

    説明

    where 句によってフィルターされたデータは含まれません。

    log_rejected_record_num

    いいえ

    データ品質の低さが原因でフィルターで除外できる行の最大数です。このパラメーターはバージョン 3.1 からサポートされています。有効な値: 0-1、または正の整数。デフォルト値は 0 です。

    • 0: フィルターされた行を記録しません。

    • -1: フィルターされたすべての行を記録します。

    • 正の整数 (例: n) は、各 BE (または CN) ノードで最大 n 個のフィルターされた行を記録します。

    timeout

    いいえ

    インポートのタイムアウト期間です。デフォルト値は 600 秒です。

    値の範囲は 1 から 259200 です。単位は秒です。

    strict_mode

    いいえ

    このインポートで厳格モードを有効にするかどうかを指定します。

    • false (デフォルト): 厳格モードは無効です。

    • true: 厳格モードは有効です。

    timezone

    いいえ

    このインポートのタイムゾーンです。デフォルトは UTC + 08:00 です。

    このパラメーターは、インポートに関係するすべてのタイムゾーン関連関数の結果に影響します。

    load_mem_limit

    いいえ

    インポートタスクのメモリ制限です。デフォルト値は 2 GB です。

    partial_update

    いいえ

    部分的な列更新を使用するかどうかを指定します。有効な値は TRUEFALSE です。デフォルト値は FALSE です。

    partial_update_mode

    いいえ

    部分更新のモードです。有効な値は rowcolumn です。

    • row (デフォルト): 部分更新に行モードを指定します。このモードは、小さなバッチで多くの列をリアルタイムに更新するのに適しています。

    • column: 部分更新に列モードを指定します。このモードは、多くの行にわたる少数の列の一括更新に適しています。このシナリオでは、列モードを有効にすると更新が高速になります。

      たとえば、100 列のテーブルで、すべての行に対して 10 列 (全体の 10%) を更新する場合、列モードを有効にすると更新パフォーマンスが 10 倍向上します。

    merge_condition

    いいえ

    更新が有効になる条件として使用される列の名前です。更新は、インポートされたデータのこの列の値が現在の値以上である場合にのみ有効になります。

    説明

    指定された列はプライマリキー以外の列でなければなりません。条件付き更新はプライマリキーテーブルでのみサポートされます。

この例では、StarRocks クラスターの `load_test` データベースの `example_table` テーブルに data.csv ファイルをインポートする方法を示します。完全な例については、「データインポートの完全な例」をご参照ください。

curl --location-trusted -u "root:" \
     -H "Expect:100-continue" \
     -H "label:label2" \
     -H "column_separator: ," \
     -T data.csv -XPUT \
     http://172.17.**.**:18030/api/load_test/example_table/_stream_load

戻り値

インポートが完了すると、インポートタスクの結果が JSON 形式で返されます。以下は応答のサンプルです。

{
    "TxnId": 9,
    "Label": "label2",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 4,
    "NumberLoadedRows": 4,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 45,
    "LoadTimeMs": 235,
    "BeginTxnTimeMs": 101,
    "StreamLoadPlanTimeMs": 102,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 11,
    "CommitAndPublishTimeMs": 19
}

次の表に、応答のパラメーターを示します。

パラメーター

説明

TxnId

インポートのトランザクション ID です。

Label

インポートのラベルです。

Status

インポートのステータスです。有効な値:

  • Success: データは正常にインポートされ、表示可能です。

  • Publish Timeout: インポートは正常に送信されましたが、データの表示に遅延が生じる可能性があります。インポートをリトライする必要はありません。

  • Label Already Exists: Label が重複しています。Label を変更する必要があります。

  • Fail: データインポートが失敗しました。

ExistingJobStatus

既存のラベルに対応するインポートタスクのステータスです。このフィールドは、StatusLabel Already Exists の場合にのみ表示されます。このステータスを使用して、既存のラベルに対応するインポートタスクのステータスを知ることができます。

  • RUNNING: タスクは実行中です。

  • FINISHED: タスクは成功しました。

Message

インポートタスクのステータスに関する詳細です。インポートが失敗した場合、失敗の具体的な理由が返されます。

NumberTotalRows

データストリームから読み取られた行の総数です。

NumberLoadedRows

インポートされた行数です。これは、インポートステータスが Success の場合にのみ有効です。

NumberFilteredRows

フィルターで除外された行数で、データ品質が不適格な行です。

NumberUnselectedRows

Where 条件によってフィルターで除外された行数です。

LoadBytes

インポートタスクのソースファイルのサイズです。

LoadTimeMs

インポートタスクにかかった時間 (ミリ秒) です。

BeginTxnTimeMs

インポートタスクのトランザクションを開始するのにかかった時間です。

StreamLoadPlanTimeMs

インポートタスクの実行計画を生成するのにかかった時間です。

ReadDataTimeMs

インポートタスクのデータを読み取るのにかかった時間です。

WriteDataTimeMs

インポートタスクのデータを書き込むのにかかった時間です。

ErrorURL

このパラメーターは、タスクがデータのインポートに失敗した場合に返されます。

ErrorURL を使用して、インポートプロセス中にデータ品質の低さが原因でフィルターで除外されたエラー行の詳細を表示できます。インポートタスクを送信するときに、オプションのパラメーター log_rejected_record_num を使用して、記録するエラー行の最大数を指定できます。

curl "url" コマンドを実行してエラー行情報を直接表示するか、wget "url" コマンドを実行してエラー行情報をエクスポートできます。

たとえば、エラー行情報をエクスポートするには:

wget "http://172.17.**.**:18040/api/_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad"

エクスポートされたエラー行情報は、_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad という名前のローカルファイルに保存されます。cat _load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad コマンドを実行してファイルの内容を表示できます。

エラーメッセージに基づいてインポートタスクを調整し、タスクを再送信できます。

インポートタスクのキャンセル

Stream Load タスクを手動でキャンセルすることはできません。タスクは、タイムアウトした場合やエラーが発生した場合に自動的にキャンセルされます。戻り値の ErrorURL を使用して、トラブルシューティングのためにエラーメッセージをダウンロードできます。

データインポートの完全な例

この例では、curl コマンドを使用してインポートタスクを作成します。

  1. インポートするデータ用のテーブルを作成します。

    1. Secure Shell (SSH) を使用して StarRocks クラスターのマスターノードにログオンします。詳細については、「クラスターへのログオン」をご参照ください。

    2. 次のコマンドを実行して、MySQL クライアントを使用して StarRocks クラスターに接続します。

      mysql -h127.0.0.1  -P 9030 -uroot
    3. 次のコマンドを実行して、データベースとテーブルを作成します。

      CREATE DATABASE IF NOT EXISTS load_test;
      USE load_test;
      
      CREATE TABLE IF NOT EXISTS example_table (
          id INT,
          name VARCHAR(50),
          age INT
      )
      DUPLICATE KEY(id)
      DISTRIBUTED BY HASH(id) BUCKETS 3
      PROPERTIES (
          "replication_num" = "1"  -- レプリカ数を 1 に設定します。
      );
      

      コマンドを実行した後、Ctrl+D を押して MySQL クライアントを終了します。

  2. テストデータを準備します。

    CSV データの準備

    たとえば、次の内容で data.csv という名前のファイルを作成します。

    id,name,age
    1,Alice,25
    2,Bob,30
    3,Charlie,35

    JSON データの準備

    たとえば、次の内容で json.data という名前のファイルを作成します。

    {"id":1,"name":"Emily","age":25}
    {"id":2,"name":"Benjamin","age":35}
    {"id":3,"name":"Olivia","age":28}
    {"id":4,"name":"Alexander","age":60}
    {"id":5,"name":"Ava","age":17}
  3. 次のコマンドを実行して、インポートタスクを作成します。

    CSV データのインポート

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label1" \
         -H "column_separator: ," \
         -T data.csv -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

    JSON データのインポート

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label2" \
         -H "format:json" \
         -T json.data -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

コード統合の例

  • Java 開発の例については、「stream_load」をご参照ください。

  • Stream Load と Spark を統合する方法の例については、「01_sparkStreaming2StarRocks」をご参照ください。