すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Broker Load

最終更新日:Jan 11, 2025

Broker Load モードでは、StarRocks はブローカープロセスを使用して、Apache Hadoop Distributed File System(HDFS)や Alibaba Cloud Object Storage Service(OSS)などのデータソースからデータを読み取り、そのコンピューティングリソースを使用してデータを前処理およびインポートします。このトピックでは、Broker Load モードでデータをインポートする方法について説明します。

背景情報

Broker Load は非同期インポート方式です。MySQL プロトコルに基づいてインポートジョブを作成し、SHOW LOAD ステートメントを実行してインポート結果をクエリできます。StarRocks は、CSV、ORC、Parquet など、さまざまなファイル形式の外部ストレージシステムからのデータインポートをサポートしています。一度に数十~数百 GB のデータをインポートするために、各インポートジョブを実行することをお勧めします。

Broker Load モードでデータをインポートする

ブローカーをクエリする

E-MapReduce(EMR)で StarRocks クラスタを作成すると、ブローカーはすべてのコアノードで自動的に構築および起動されます。SHOW PROC ステートメントを実行して、クラスタのブローカーをクエリできます。次のコードは構文を示しています。

SHOW PROC "/brokers"\G

次の出力が返されます。

*************************** 1. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 2. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 3. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
*************************** 4. row ***************************
          Name: broker
            IP: 10.0.**.**
          Port: 8000
         Alive: true
 LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
        ErrMsg:
4 rows in set (0.00 sec)

インポートジョブを作成する

  • 構文

    2.5.8 より前の StarRocks バージョン

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_name broker_properties
        [PROPERTIES (key1=value1, ... )]

    2.5.8 以降の StarRocks バージョン

    LOAD LABEL db_name.label_name
        (data_desc, ...)
    WITH BROKER broker_properties
        [PROPERTIES (key1=value1, ... )]
  • パラメータ

    HELP BROKER LOAD ステートメントを実行して、インポートジョブを作成するための構文を表示できます。

    • ラベル

      インポートジョブの識別子。各インポートジョブには一意のラベルがあります。カスタムラベルを定義できます。そうでない場合は、システムによってラベルが生成されます。ラベルを使用して、インポートジョブの状態をクエリし、重複データのインポートを回避できます。インポートジョブの状態が FINISHED に変更されると、ラベルは無効になります。インポートジョブの状態が CANCELLED に変更された場合は、ラベルを使用してインポートジョブを再送信できます。

    • data_desc

      インポートするデータを記述する文字列。複数の data_desc 文字列を指定でき、それぞれにデータソースアドレス、抽出、変換、ロード(ETL)関数、宛先テーブル、パーティションなどの情報が含まれます。

      Broker Load モードのインポートジョブでは、一度に複数のテーブルにデータをインポートできます。宛先テーブルごとに data_desc 文字列を指定できます。各 data_desc 文字列では、複数の file_path 文字列を含むデータソースアドレスを指定できます。 file_path 文字列は、複数のソースファイルを指定するために使用されます。Broker Load モードでは、一度に複数のテーブルにデータをインポートする際の成功の原子性が保証されます。次のコードは、通常 data_desc 文字列で構成されるパラメータを示しています。

      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      次の表にパラメータを示します。

      パラメータ

      説明

      file_path

      ファイルを示す特定のファイルパス、またはワイルドカードとしてアスタリスク(*)を含み、ディレクトリ内のすべてのファイルを示すファイルパスのいずれか。宛先ディレクトリの親ディレクトリにもワイルドカードを含めることができます。

      次の特殊文字がワイルドカードとしてサポートされています:? * [] {} ^。ワイルドカードの使用方法の詳細については、「FileSystem」をご参照ください。

      たとえば、このパラメータを hdfs://hdfs_host:hdfs_port/user/data/tablename// に設定すると、/tablename ディレクトリのパーティション内のすべてのファイルがインポートされます。このパラメータを hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ に設定すると、/tablename ディレクトリの 202104 パーティション内のファイルのみがインポートされます。

      negative

      指定されたソースデータがインポート済みであり、このインポートジョブによって削除されることを示すフラグ。

      このパラメータは、宛先テーブルの SUM タイプの集計列にインポートされたデータのバッチをキャンセルする場合に適用できます。インポートジョブが完了すると、データのバッチが集計列から削除されます。

      partition

      宛先テーブルにインポートするソースファイル内のパーティション。

      指定されたパーティションに属するソースデータのみがインポートされます。指定されたパーティションに属さないソースデータは、エラーデータとして判定されます。このようなデータをエラーデータとして判定したくない場合は、WHERE 述語を使用してフィルタリングします。

      column_separator

      ソースファイル内のデータを列に分割するために使用する列区切り文字。デフォルト値:\t。

      不可視文字を列区切り文字として指定する場合は、プレフィックスとして \x を追加し、16 進数で区切り文字を設定します。たとえば、ソース Hive ファイルで使用される区切り文字が \x01 の場合は、列区切り文字を \\x01 に設定します。

      file_type

      ソースファイルの形式。有効な値:parquetorccsv。デフォルト値:csv

      Parquet ファイルのファイル名拡張子は、.parquet または .parq です。

      COLUMNS FROM PATH AS

      ソースファイルのパス内のパーティションフィールド。

      たとえば、ソースファイルのパスが /path/col_name=col_value/dt=20210101/file1 で、col_name と dt がテーブル列であるとします。次のコードに示すように、値 col_value と 20210101 は、col_name と dt に対応する宛先列にインポートされます。

      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)

      列マッピングの設定

      列型変換のための関数を含む SET ステートメント。

      ソース列と宛先列の型が異なる場合は、列型変換のための SET ステートメントを指定する必要があります。

      where 述語

      列型変換後にデータフィルタリングに使用する WHERE 述語。

      フィルタリングされたデータは、最大フィルタ比率の計算にはカウントされません。 data_desc 文字列に同じテーブルの複数の WHERE 述語が含まれている場合、述語は AND 演算子によって結合されます。

    • broker_properties

      broker_properties 文字列のインポートジョブのプロパティパラメータ。これらのプロパティパラメータは、インポートジョブに適用されます。

      broker_properties:
          (key2=value2, ...)

      次の表に、broker_properties 文字列のプロパティパラメータの一部を示します。

      パラメータ

      説明

      timeout

      インポートジョブのタイムアウト期間。単位:秒。

      opt_properties 文字列で各インポートジョブのタイムアウト期間を指定できます。指定されたタイムアウト期間内にインポートジョブが完了しない場合、インポートジョブの状態は CANCELLED に変更されます。Broker Load モードのインポートジョブのデフォルトのタイムアウト期間は 4 時間です。

      重要

      インポートジョブの完了にデフォルトのタイムアウト期間よりも長い時間がかかると推定されない限り、インポートジョブのタイムアウト期間を指定する必要はありません。

      次の式を使用して、秒単位で最小タイムアウト期間を計算することをお勧めします:(MB 単位のファイルの合計サイズ×ソーステーブルと関連するロールアップテーブルの数)/(30×インポートジョブの同時実行性)

      式の数字 30 は 30 MB/s を示し、これはバックエンドの平均インポート速度です。たとえば、ソースデータのファイルの合計サイズが 1 GB、1 つのソーステーブルに 2 つのロールアップテーブルがあり、インポートジョブの同時実行性が 3 であるとします。最小タイムアウト期間は、次の式を使用して計算されます:(1×1,024×3)/(10×3)= 102 秒。

      StarRocks クラスタには、さまざまなマシン環境と同時クエリジョブがあります。履歴インポートジョブの速度に基づいて、StarRocks クラスタの最も遅いインポート速度を推定する必要があります。

      max_filter_ratio

      インポートジョブの最大フィルタ比率。有効な値:0~1。デフォルト値:0。インポートジョブのエラー率が最大フィルタ比率を超えると、インポートジョブは失敗します。エラーデータ行を無視する場合は、このパラメータを 0 より大きい値に設定して、インポートジョブが成功するようにします。

      次の式を使用して、適切な最大フィルタ比率を計算します:max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)

      dpp.abnorm.ALL は、型の不一致、列数の不一致、長さの不一致など、さまざまな理由でインポートできないデータ行の数を示します。dpp.abnorm.ALL は、インポートできるデータ行の数を示します。SHOW LOAD ステートメントを実行して、インポートジョブによってインポートされたデータ量をクエリできます。

      ソースファイルのデータ行数 = dpp.abnorm.ALL + dpp.norm.ALL

      load_mem_limit

      インポートジョブに割り当てられるメモリの制限。デフォルト値:0。値 0 は、インポートジョブに割り当てられるメモリが制限されていないことを示します。

      strict_mode

      インポートジョブの厳密モードを有効にするかどうかを指定します。厳密モードを有効にするには、このパラメータを true に設定します:properties("strict_mode" = "true")

      デフォルトでは、厳密モードは無効になっています。

      厳密モードが有効になっている場合、列型変換後にエラーデータがフィルタリングされます。エラーデータとは、ソースファイルでは null ではないが、列型変換後に null 値に変換される値を指します。次の項目に注意してください。

      • 厳密モードは、関数を基に値が生成されるソース列には適用されません。

      • 宛先列が値を範囲に制限し、ソース列の値を変換できるが、結果の値が範囲外である場合、厳密モードはソース列には適用されません。たとえば、ソース列の値が 10 で、宛先列の型が DECIMAL(1,0)であるとします。値 10 は変換できますが、結果の値は範囲外です。厳密モードはソース列には適用されません。

  • Alibaba Cloud OSS からデータをインポートするためのインポートジョブを作成するサンプルコード

    重要
    • StarRocks クラスタでは、ブローカー名として broker を使用できます。

    • StarRocks のバージョンが 2.5.8 より前の場合は、次のサンプルコードを参照してインポートジョブを作成できます。StarRocks のバージョンが 2.5.8 以降の場合は、次のサンプルコードから WITH BROKER broker の部分を削除してください。

    2.5.8 より前の StarRocks バージョン

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
    );

    2.5.8 以降の StarRocks バージョン

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )

インポートジョブの状態をクエリする

Broker Load モードでのデータインポートは非同期です。インポートジョブの状態をクエリするには、SHOW LOAD ステートメントでジョブのラベルを指定して、ステートメントを実行します。このステートメントの構文を表示するには、HELP SHOW LOAD ステートメントを実行します。

重要

SHOW LOAD ステートメントは、非同期インポートジョブのみをサポートしています。Stream Load モードなどの同期インポートジョブの場合、SHOW LOAD ステートメントを使用して状態をクエリすることはできません。

次のサンプルコードは、インポートジョブの状態をクエリする方法の例を示しています。

show load where label = 'label1'\G
*************************** 1. row ***************************
         JobId: 7****
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

次の表にパラメータを示します。

パラメータ

説明

JobId

インポートジョブの一意の ID。ジョブ ID はシステムによって自動的に生成されます。インポートジョブの ID は再利用できませんが、インポートジョブが失敗した場合、インポートジョブのラベルは再利用できます。

Label

インポートジョブの識別子。

State

インポートジョブの状態。有効な値:

  • PENDING:インポートジョブは実行待ちです。

  • LOADING:インポートジョブは実行中です。

  • CANCELLED:インポートジョブは失敗しました。

  • FINISHED:インポートジョブは成功しました。

Progress

インポートジョブの進捗情報。進捗情報は、ETL と LOADING の 2 つのインポートフェーズを記述します。Broker Load モードでのデータインポートには、LOADING フェーズのみが含まれます。ETL の進捗状況は N/A として表示され、LOADING の進捗状況は 0~100% になります。

LOADING の進捗状況は、次の式を使用して計算されます:LOADING の進捗状況 = インポート済みのテーブルの数/このインポートジョブのソーステーブルの合計数×100%

すべてのソーステーブルがインポートされ、インポートジョブが完了する準備ができると、LOADING の進捗状況は 99% に達します。LOADING の進捗状況は、インポートジョブが完了した後にのみ 100% に達します。

重要

インポートの進捗状況は線形ではありません。進捗状況が一定期間変更されない場合でも、インポートジョブは引き続き実行中である可能性があります。

Type

インポートジョブのタイプ。Broker Load モードのインポートジョブのタイプは BROKER です。

EtlInfo

インポートジョブのデータ量メトリック:unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

unselected.rows は、WHERE 述語によってフィルタリングされたデータ行の数を示します。dpp.norm.ALLdpp.abnorm.ALL は、インポートジョブのエラー率が最大フィルタ比率を超えているかどうかを判断するのに役立ちます。3 つのメトリックの値の合計は、ソースファイルのデータ行の合計数と等しくなります。

TaskInfo

インポートジョブを作成したときに構成したパラメータ。クラスタ、タイムアウト期間、最大フィルタ比率などが含まれます。

ErrorMsg

インポートジョブによって返されたメッセージ。インポートジョブが CANCELLED 状態の場合、このパラメータの値はインポート失敗の原因を示し、type と msg の 2 つの部分を含みます。インポートジョブが FINISHED 状態の場合、このパラメータの値は N/A です。type 部分には、次の有効な値があります。

  • USER-CANCEL:インポートジョブはキャンセルされました。

  • ETL-RUN-FAIL:インポートジョブは ETL フェーズで失敗しました。

  • ETL-QUALITY-UNSATISFIED:エラー率が最大フィルタ比率を超えています。

  • LOAD-RUN-FAIL:インポートジョブは LOADING フェーズで失敗しました。

  • TIMEOUT:インポートジョブはタイムアウト期間内に完了しませんでした。

  • UNKNOWN:インポートジョブの実行中に不明なエラーが発生しました。

CreateTime

インポートジョブが作成された時刻、ETL フェーズが開始された時刻、ETL フェーズが終了した時刻、LOADING フェーズが開始された時刻、インポートジョブが完了した時刻。

  • Broker Load モードのインポートジョブには ETL フェーズがありません。したがって、EtlStartTimeEtlFinishTimeLoadStartTime パラメータの値は同じです。

  • 長期間にわたって CreateTime パラメータの値のみが表示され、LoadStartTime パラメータの値が N/A のままの場合、多数のインポートジョブが実行待ちです。今のところ、これ以上のインポートジョブを送信しないことをお勧めします。

    LoadFinishTime - CreateTime = インポートジョブの期間
    
    LoadFinishTime - LoadStartTime = インポートジョブの LOADING フェーズの期間 = インポートジョブの期間 - インポートジョブの待ち時間

EtlStartTime

EtlFinishTime

LoadStartTime

LoadFinishTime

URL

インポートジョブ中のサンプルエラーデータの URL。インポートジョブにエラーデータがない場合、このパラメータの値は N/A です。

JobDetails

インポートジョブの詳細。インポートされたファイルの数とバイト単位の合計サイズ、タスクの数、処理されたソースデータ行の数、タスクを実行するバックエンドの ID、待機中のタスクが存在するバックエンドの ID などが含まれます。

{"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"`ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

処理されたソースデータ行の数は 5 秒ごとに更新されます。この数は現在の進捗状況のみを示し、インポートジョブの完了後に処理されるデータ行の合計数を意味するものではありません。後者は EtlInfo パラメータによって示されます。

インポートジョブをキャンセルする

Broker Load モードのインポートジョブは、CANCELLED または FINISHED 状態ではない場合にキャンセルできます。インポートジョブをキャンセルするには、CANCEL LOAD ステートメントでラベルを指定して、ステートメントを実行します。HELP CANCEL LOAD ステートメントを実行して、インポートジョブをキャンセルするための構文を表示できます。

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

HDFS からデータをインポートする

  • 構文

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker1'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • HDFS 認証

    HDFS のコミュニティエディションは、シンプル認証と Kerberos 認証の 2 つの認証モードをサポートしています。

    • シンプル認証:ユーザー ID は、HDFS に接続されているクライアントのオペレーティングシステムによって決定されます。

      次の表にパラメータを示します。

      パラメータ

      説明

      hadoop.security.authentication

      認証モード。この例では、このパラメータは simple に設定されています。

      username

      HDFS にログオンするために使用されるユーザー名。

      password

      HDFS にログオンするために使用されるパスワード。

    • Kerberos 認証:クライアントのユーザー ID は、その Kerberos 資格情報によって決定されます。

      次の表にパラメータを示します。

      パラメータ

      説明

      hadoop.security.authentication

      認証モード。この例では、このパラメータは kerberos に設定されています。

      kerberos_principal

      Kerberos 認証のプリンシパル。

      kerberos_keytab

      Kerberos keytab ファイルのパス。ファイルは、ブローカープロセスと同じサーバーに存在する必要があります。

      kerberos_keytab_content

      Kerberos keytab ファイルの Base64 エンコードされたコンテンツ。

      重要

      このパラメータまたは kerberos_keytab パラメータのいずれかを構成する必要があります。

  • HDFS の HA 構成

    HDFS クラスタの NameNode に高可用性(HA)を構成した後、アクティブな NameNode が別の NameNode に切り替えられた場合、新しいアクティブな NameNode を自動的に識別できます。HA モードでデプロイされた HDFS クラスタにアクセスするには、次の表に示すパラメータを構成します。

    パラメータ

    説明

    dfs.nameservices

    HDFS サービスの名前。カスタム名を設定できます。

    たとえば、dfs.nameservices パラメータを my_ha に設定します。

    dfs.ha.namenodes.xxx

    NameNode のカスタム名。複数の名前はカンマ(,)で区切ります。このパラメータ名の xxx は、dfs.nameservices パラメータに設定したカスタム名に置き換えます。

    たとえば、dfs.ha.namenodes.my_ha パラメータを my_nn に設定します。

    dfs.namenode.rpc-address.xxx.nn

    NameNode がリモートプロシージャコール(RPC)に使用するアドレス。このパラメータ名の nn は、dfs.ha.namenodes.xxx パラメータに設定した NameNode の名前に置き換えます。

    たとえば、dfs.namenode.rpc-address.my_ha.my_nn パラメータを Hostname:Port number 形式の値に設定します。

    dfs.client.failover.proxy.provider

    クライアントが NameNode に接続するために使用するプロバイダ。デフォルト値:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

    次のサンプルコードは例を示しています。

    (
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    シンプル認証または Kerberos 認証を使用して、HA モードでデプロイされた HDFS クラスタにアクセスできます。次のサンプルコードは、シンプル認証を使用して HA HDFS クラスタにアクセスする方法の例を示しています。

    (
        "username"="user",
        "password"="passwd",
        "dfs.nameservices" = "my-ha",
        "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2",
        "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
        "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    )

    HDFS クラスタの構成は、hdfs-site.xml ファイルに書き込むことができます。ブローカープロセスを使用して HDFS クラスタに関する情報を読み取る場合は、クラスタのファイルパスと認証情報のみを指定する必要があります。

  1. テストテーブルを作成します。次のサンプルコードを実行して、tpch データベースに lineitem という名前のテーブルを作成します。

    CREATE TABLE lineitem (
      l_orderkey bigint,
      l_partkey bigint,
      l_suppkey bigint,
      l_linenumber int,
      l_quantity double,
      l_extendedprice double,
      l_discount double,
      l_tax double,
      l_returnflag string,
      l_linestatus string,
      l_shipdate date,
      l_commitdate date,
      l_receiptdate date,
      l_shipinstruct string,
      l_shipmode string,
      l_comment string
    )
    ENGINE=OLAP
    DUPLICATE KEY(l_orderkey)
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES(
      "replication_num" = "1"
    );
  2. インポートジョブを作成します。

    重要

    StarRocks のバージョンが 2.5.8 より前の場合は、次のサンプルコードを参照してインポートジョブを作成できます。StarRocks のバージョンが 2.5.8 以降の場合は、次のサンプルコードから WITH BROKER broker の部分を削除してください。

    2.5.8 より前の StarRocks バージョン

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
    WITH BROKER broker
    (
        "fs.oss.accessKeyId" = "xxx",
        "fs.oss.accessKeySecret" = "xxx",
        "fs.oss.endpoint" = "xxx"
    );

    2.5.8 以降の StarRocks バージョン

    LOAD LABEL tpch.lineitem
    (
        DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl")
        INTO TABLE `lineitem`
        COLUMNS TERMINATED BY '|'
        (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment)
    )
  3. インポートジョブの状態をクエリします。

    show load where label = 'lineitem'\G;
    
    *************************** 1. row ***************************
             JobId: 1****
             Label: lineitem
             State: FINISHED
          Progress: ETL:100%; LOAD:100%
              Type: BROKER
           EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215
          TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
          ErrorMsg: NULL
        CreateTime: 2022-04-13 15:07:53
      EtlStartTime: 2022-04-13 15:07:56
     EtlFinishTime: 2022-04-13 15:07:56
     LoadStartTime: 2022-04-13 15:07:56
    LoadFinishTime: 2022-04-13 15:08:06
               URL: NULL
        JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072}
    2 rows in set (0.00 sec)
  4. インポートジョブの完了後、ビジネス要件に基づいてデータをクエリします。

    • lineitem テーブルのデータ行数をクエリします。

      select count(*) from lineitem;

      次の出力が返されます。

      +----------+
      | count(*) |
      +----------+
      |  6001215 |
      +----------+
      1 row in set (0.03 sec)
    • lineitem テーブルの最初の 2 行のデータをクエリします。

      select * from lineitem limit 2;

      次の出力が返されます。

      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment                                  |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      |         69 |    115209 |      7721 |            1 |         48 |         58761.6 |       0.01 |  0.07 | A            | F            | 1994-08-17 | 1994-08-11   | 1994-09-08    | NONE           | TRUCK      | regular epitaphs. carefully even ideas hag |
      |         69 |    104180 |      9201 |            2 |         32 |        37893.76 |       0.08 |  0.06 | A            | F            | 1994-08-24 | 1994-08-17   | 1994-08-31    | NONE           | REG AIR    | s sleep carefully bold,                    |
      +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+
      2 rows in set (0.01 sec)

インポートジョブの同時実行性

インポートジョブは、1 つ以上のタスクで構成されます。タスクは並列に実行されます。インポートジョブは、LOAD ステートメントの data_desc 文字列に基づいてタスクに分割できます。例:

  • インポートジョブに、異なるデータソースアドレスからのソーステーブルを指定する複数の data_desc 文字列がある場合、インポートジョブはタスクに分割され、各タスクには data_desc 文字列が含まれます。

  • インポートジョブに、ソーステーブルの異なるパーティションを指定する複数の data_desc 文字列がある場合、インポートジョブもタスクに分割され、各タスクには data_desc 文字列が含まれます。

各タスクには、並列実行のためにバックエンドに均等に分散される 1 つ以上のインスタンスを含めることができます。タスクは、次のフロントエンド構成に基づいてインスタンスに分割されます。

  • min_bytes_per_broker_scanner:各インスタンスによって処理されるデータの最小量。デフォルトでは、最小データ量は 64 MB です。

  • max_broker_concurrency:各タスクの並列インスタンスの最大数。デフォルト値:100。

  • load_parallel_instance_num:各バックエンドの並列インスタンスの数。デフォルト値:1。

インスタンスの総数は、次の値の最小値と等しくなります。インポートされたファイルの合計サイズを min_bytes_per_broker_scanner の値で割った値、max_broker_concurrency の値、load_parallel_instance_num の値にバックエンドの数を掛けた値。

ほとんどの場合、インポートジョブには data_desc 文字列が 1 つしかないため、タスクは 1 つだけ含まれます。タスクはインスタンスに分割されます。インスタンスの数はバックエンドの数と等しくなります。各インスタンスは、並列実行のために異なるバックエンドに割り当てられます。