Broker Load モードでは、StarRocks はブローカープロセスを使用して、Apache Hadoop Distributed File System(HDFS)や Alibaba Cloud Object Storage Service(OSS)などのデータソースからデータを読み取り、そのコンピューティングリソースを使用してデータを前処理およびインポートします。このトピックでは、Broker Load モードでデータをインポートする方法について説明します。
背景情報
Broker Load は非同期インポート方式です。MySQL プロトコルに基づいてインポートジョブを作成し、SHOW LOAD ステートメントを実行してインポート結果をクエリできます。StarRocks は、CSV、ORC、Parquet など、さまざまなファイル形式の外部ストレージシステムからのデータインポートをサポートしています。一度に数十~数百 GB のデータをインポートするために、各インポートジョブを実行することをお勧めします。
Broker Load モードでデータをインポートする
ブローカーをクエリする
E-MapReduce(EMR)で StarRocks クラスタを作成すると、ブローカーはすべてのコアノードで自動的に構築および起動されます。SHOW PROC ステートメントを実行して、クラスタのブローカーをクエリできます。次のコードは構文を示しています。
SHOW PROC "/brokers"\G次の出力が返されます。
*************************** 1. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 2. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 3. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
*************************** 4. row ***************************
Name: broker
IP: 10.0.**.**
Port: 8000
Alive: true
LastStartTime: 2022-04-13 11:38:46
LastUpdateTime: 2022-04-13 15:26:44
ErrMsg:
4 rows in set (0.00 sec)インポートジョブを作成する
構文
2.5.8 より前の StarRocks バージョン
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_name broker_properties [PROPERTIES (key1=value1, ... )]2.5.8 以降の StarRocks バージョン
LOAD LABEL db_name.label_name (data_desc, ...) WITH BROKER broker_properties [PROPERTIES (key1=value1, ... )]パラメータ
HELP BROKER LOADステートメントを実行して、インポートジョブを作成するための構文を表示できます。ラベル
インポートジョブの識別子。各インポートジョブには一意のラベルがあります。カスタムラベルを定義できます。そうでない場合は、システムによってラベルが生成されます。ラベルを使用して、インポートジョブの状態をクエリし、重複データのインポートを回避できます。インポートジョブの状態が FINISHED に変更されると、ラベルは無効になります。インポートジョブの状態が CANCELLED に変更された場合は、ラベルを使用してインポートジョブを再送信できます。
data_desc
インポートするデータを記述する文字列。複数の data_desc 文字列を指定でき、それぞれにデータソースアドレス、抽出、変換、ロード(ETL)関数、宛先テーブル、パーティションなどの情報が含まれます。
Broker Load モードのインポートジョブでは、一度に複数のテーブルにデータをインポートできます。宛先テーブルごとに data_desc 文字列を指定できます。各 data_desc 文字列では、複数の file_path 文字列を含むデータソースアドレスを指定できます。 file_path 文字列は、複数のソースファイルを指定するために使用されます。Broker Load モードでは、一度に複数のテーブルにデータをインポートする際の成功の原子性が保証されます。次のコードは、通常 data_desc 文字列で構成されるパラメータを示しています。
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]次の表にパラメータを示します。
パラメータ
説明
file_path
ファイルを示す特定のファイルパス、またはワイルドカードとしてアスタリスク(*)を含み、ディレクトリ内のすべてのファイルを示すファイルパスのいずれか。宛先ディレクトリの親ディレクトリにもワイルドカードを含めることができます。
次の特殊文字がワイルドカードとしてサポートされています:? * [] {} ^。ワイルドカードの使用方法の詳細については、「FileSystem」をご参照ください。
たとえば、このパラメータを hdfs://hdfs_host:hdfs_port/user/data/tablename// に設定すると、/tablename ディレクトリのパーティション内のすべてのファイルがインポートされます。このパラメータを hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104/ に設定すると、/tablename ディレクトリの 202104 パーティション内のファイルのみがインポートされます。
negative
指定されたソースデータがインポート済みであり、このインポートジョブによって削除されることを示すフラグ。
このパラメータは、宛先テーブルの SUM タイプの集計列にインポートされたデータのバッチをキャンセルする場合に適用できます。インポートジョブが完了すると、データのバッチが集計列から削除されます。
partition
宛先テーブルにインポートするソースファイル内のパーティション。
指定されたパーティションに属するソースデータのみがインポートされます。指定されたパーティションに属さないソースデータは、エラーデータとして判定されます。このようなデータをエラーデータとして判定したくない場合は、WHERE 述語を使用してフィルタリングします。
column_separator
ソースファイル内のデータを列に分割するために使用する列区切り文字。デフォルト値:\t。
不可視文字を列区切り文字として指定する場合は、プレフィックスとして \x を追加し、16 進数で区切り文字を設定します。たとえば、ソース Hive ファイルで使用される区切り文字が \x01 の場合は、列区切り文字を \\x01 に設定します。
file_type
ソースファイルの形式。有効な値:parquet、orc、csv。デフォルト値:csv。
Parquet ファイルのファイル名拡張子は、.parquet または .parq です。
COLUMNS FROM PATH AS
ソースファイルのパス内のパーティションフィールド。
たとえば、ソースファイルのパスが /path/col_name=col_value/dt=20210101/file1 で、col_name と dt がテーブル列であるとします。次のコードに示すように、値 col_value と 20210101 は、col_name と dt に対応する宛先列にインポートされます。
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)列マッピングの設定
列型変換のための関数を含む SET ステートメント。
ソース列と宛先列の型が異なる場合は、列型変換のための SET ステートメントを指定する必要があります。
where 述語
列型変換後にデータフィルタリングに使用する WHERE 述語。
フィルタリングされたデータは、最大フィルタ比率の計算にはカウントされません。 data_desc 文字列に同じテーブルの複数の WHERE 述語が含まれている場合、述語は AND 演算子によって結合されます。
broker_properties
broker_properties 文字列のインポートジョブのプロパティパラメータ。これらのプロパティパラメータは、インポートジョブに適用されます。
broker_properties: (key2=value2, ...)次の表に、broker_properties 文字列のプロパティパラメータの一部を示します。
パラメータ
説明
timeout
インポートジョブのタイムアウト期間。単位:秒。
opt_properties 文字列で各インポートジョブのタイムアウト期間を指定できます。指定されたタイムアウト期間内にインポートジョブが完了しない場合、インポートジョブの状態は CANCELLED に変更されます。Broker Load モードのインポートジョブのデフォルトのタイムアウト期間は 4 時間です。
重要インポートジョブの完了にデフォルトのタイムアウト期間よりも長い時間がかかると推定されない限り、インポートジョブのタイムアウト期間を指定する必要はありません。
次の式を使用して、秒単位で最小タイムアウト期間を計算することをお勧めします:
(MB 単位のファイルの合計サイズ×ソーステーブルと関連するロールアップテーブルの数)/(30×インポートジョブの同時実行性)。式の数字 30 は 30 MB/s を示し、これはバックエンドの平均インポート速度です。たとえば、ソースデータのファイルの合計サイズが 1 GB、1 つのソーステーブルに 2 つのロールアップテーブルがあり、インポートジョブの同時実行性が 3 であるとします。最小タイムアウト期間は、次の式を使用して計算されます:(1×1,024×3)/(10×3)= 102 秒。
StarRocks クラスタには、さまざまなマシン環境と同時クエリジョブがあります。履歴インポートジョブの速度に基づいて、StarRocks クラスタの最も遅いインポート速度を推定する必要があります。
max_filter_ratio
インポートジョブの最大フィルタ比率。有効な値:0~1。デフォルト値:0。インポートジョブのエラー率が最大フィルタ比率を超えると、インポートジョブは失敗します。エラーデータ行を無視する場合は、このパラメータを 0 より大きい値に設定して、インポートジョブが成功するようにします。
次の式を使用して、適切な最大フィルタ比率を計算します:
max_filter_ratio = dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)。dpp.abnorm.ALLは、型の不一致、列数の不一致、長さの不一致など、さまざまな理由でインポートできないデータ行の数を示します。dpp.abnorm.ALLは、インポートできるデータ行の数を示します。SHOW LOADステートメントを実行して、インポートジョブによってインポートされたデータ量をクエリできます。ソースファイルのデータ行数 = dpp.abnorm.ALL + dpp.norm.ALLload_mem_limit
インポートジョブに割り当てられるメモリの制限。デフォルト値:0。値 0 は、インポートジョブに割り当てられるメモリが制限されていないことを示します。
strict_mode
インポートジョブの厳密モードを有効にするかどうかを指定します。厳密モードを有効にするには、このパラメータを true に設定します:
properties("strict_mode" = "true")。デフォルトでは、厳密モードは無効になっています。
厳密モードが有効になっている場合、列型変換後にエラーデータがフィルタリングされます。エラーデータとは、ソースファイルでは null ではないが、列型変換後に null 値に変換される値を指します。次の項目に注意してください。
厳密モードは、関数を基に値が生成されるソース列には適用されません。
宛先列が値を範囲に制限し、ソース列の値を変換できるが、結果の値が範囲外である場合、厳密モードはソース列には適用されません。たとえば、ソース列の値が 10 で、宛先列の型が DECIMAL(1,0)であるとします。値 10 は変換できますが、結果の値は範囲外です。厳密モードはソース列には適用されません。
Alibaba Cloud OSS からデータをインポートするためのインポートジョブを作成するサンプルコード
重要StarRocks クラスタでは、ブローカー名として broker を使用できます。
StarRocks のバージョンが 2.5.8 より前の場合は、次のサンプルコードを参照してインポートジョブを作成できます。StarRocks のバージョンが 2.5.8 以降の場合は、次のサンプルコードから
WITH BROKER brokerの部分を削除してください。
2.5.8 より前の StarRocks バージョン
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" );2.5.8 以降の StarRocks バージョン
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://bucket/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )
インポートジョブの状態をクエリする
Broker Load モードでのデータインポートは非同期です。インポートジョブの状態をクエリするには、SHOW LOAD ステートメントでジョブのラベルを指定して、ステートメントを実行します。このステートメントの構文を表示するには、HELP SHOW LOAD ステートメントを実行します。
SHOW LOAD ステートメントは、非同期インポートジョブのみをサポートしています。Stream Load モードなどの同期インポートジョブの場合、SHOW LOAD ステートメントを使用して状態をクエリすることはできません。
次のサンプルコードは、インポートジョブの状態をクエリする方法の例を示しています。
show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 7****
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.**.**:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e272541****
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}次の表にパラメータを示します。
パラメータ | 説明 |
JobId | インポートジョブの一意の ID。ジョブ ID はシステムによって自動的に生成されます。インポートジョブの ID は再利用できませんが、インポートジョブが失敗した場合、インポートジョブのラベルは再利用できます。 |
Label | インポートジョブの識別子。 |
State | インポートジョブの状態。有効な値:
|
Progress | インポートジョブの進捗情報。進捗情報は、ETL と LOADING の 2 つのインポートフェーズを記述します。Broker Load モードでのデータインポートには、LOADING フェーズのみが含まれます。ETL の進捗状況は N/A として表示され、LOADING の進捗状況は 0~100% になります。 LOADING の進捗状況は、次の式を使用して計算されます: すべてのソーステーブルがインポートされ、インポートジョブが完了する準備ができると、LOADING の進捗状況は 99% に達します。LOADING の進捗状況は、インポートジョブが完了した後にのみ 100% に達します。 重要 インポートの進捗状況は線形ではありません。進捗状況が一定期間変更されない場合でも、インポートジョブは引き続き実行中である可能性があります。 |
Type | インポートジョブのタイプ。Broker Load モードのインポートジョブのタイプは BROKER です。 |
EtlInfo | インポートジョブのデータ量メトリック:unselected.rows、dpp.norm.ALL、dpp.abnorm.ALL。 unselected.rows は、WHERE 述語によってフィルタリングされたデータ行の数を示します。dpp.norm.ALL と dpp.abnorm.ALL は、インポートジョブのエラー率が最大フィルタ比率を超えているかどうかを判断するのに役立ちます。3 つのメトリックの値の合計は、ソースファイルのデータ行の合計数と等しくなります。 |
TaskInfo | インポートジョブを作成したときに構成したパラメータ。クラスタ、タイムアウト期間、最大フィルタ比率などが含まれます。 |
ErrorMsg | インポートジョブによって返されたメッセージ。インポートジョブが CANCELLED 状態の場合、このパラメータの値はインポート失敗の原因を示し、type と msg の 2 つの部分を含みます。インポートジョブが FINISHED 状態の場合、このパラメータの値は N/A です。type 部分には、次の有効な値があります。
|
CreateTime | インポートジョブが作成された時刻、ETL フェーズが開始された時刻、ETL フェーズが終了した時刻、LOADING フェーズが開始された時刻、インポートジョブが完了した時刻。
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | インポートジョブ中のサンプルエラーデータの URL。インポートジョブにエラーデータがない場合、このパラメータの値は N/A です。 |
JobDetails | インポートジョブの詳細。インポートされたファイルの数とバイト単位の合計サイズ、タスクの数、処理されたソースデータ行の数、タスクを実行するバックエンドの ID、待機中のタスクが存在するバックエンドの ID などが含まれます。 処理されたソースデータ行の数は 5 秒ごとに更新されます。この数は現在の進捗状況のみを示し、インポートジョブの完了後に処理されるデータ行の合計数を意味するものではありません。後者は EtlInfo パラメータによって示されます。 |
インポートジョブをキャンセルする
Broker Load モードのインポートジョブは、CANCELLED または FINISHED 状態ではない場合にキャンセルできます。インポートジョブをキャンセルするには、CANCEL LOAD ステートメントでラベルを指定して、ステートメントを実行します。HELP CANCEL LOAD ステートメントを実行して、インポートジョブをキャンセルするための構文を表示できます。
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];HDFS からデータをインポートする
構文
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker1' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );HDFS 認証
HDFS のコミュニティエディションは、シンプル認証と Kerberos 認証の 2 つの認証モードをサポートしています。
シンプル認証:ユーザー ID は、HDFS に接続されているクライアントのオペレーティングシステムによって決定されます。
次の表にパラメータを示します。
パラメータ
説明
hadoop.security.authentication
認証モード。この例では、このパラメータは simple に設定されています。
username
HDFS にログオンするために使用されるユーザー名。
password
HDFS にログオンするために使用されるパスワード。
Kerberos 認証:クライアントのユーザー ID は、その Kerberos 資格情報によって決定されます。
次の表にパラメータを示します。
パラメータ
説明
hadoop.security.authentication
認証モード。この例では、このパラメータは kerberos に設定されています。
kerberos_principal
Kerberos 認証のプリンシパル。
kerberos_keytab
Kerberos keytab ファイルのパス。ファイルは、ブローカープロセスと同じサーバーに存在する必要があります。
kerberos_keytab_content
Kerberos keytab ファイルの Base64 エンコードされたコンテンツ。
重要このパラメータまたは kerberos_keytab パラメータのいずれかを構成する必要があります。
HDFS の HA 構成
HDFS クラスタの NameNode に高可用性(HA)を構成した後、アクティブな NameNode が別の NameNode に切り替えられた場合、新しいアクティブな NameNode を自動的に識別できます。HA モードでデプロイされた HDFS クラスタにアクセスするには、次の表に示すパラメータを構成します。
パラメータ
説明
dfs.nameservices
HDFS サービスの名前。カスタム名を設定できます。
たとえば、dfs.nameservices パラメータを my_ha に設定します。
dfs.ha.namenodes.xxx
NameNode のカスタム名。複数の名前はカンマ(,)で区切ります。このパラメータ名の xxx は、dfs.nameservices パラメータに設定したカスタム名に置き換えます。
たとえば、dfs.ha.namenodes.my_ha パラメータを my_nn に設定します。
dfs.namenode.rpc-address.xxx.nn
NameNode がリモートプロシージャコール(RPC)に使用するアドレス。このパラメータ名の nn は、dfs.ha.namenodes.xxx パラメータに設定した NameNode の名前に置き換えます。
たとえば、dfs.namenode.rpc-address.my_ha.my_nn パラメータを Hostname:Port number 形式の値に設定します。
dfs.client.failover.proxy.provider
クライアントが NameNode に接続するために使用するプロバイダ。デフォルト値:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。
次のサンプルコードは例を示しています。
( "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my-namenode1,my-namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )シンプル認証または Kerberos 認証を使用して、HA モードでデプロイされた HDFS クラスタにアクセスできます。次のサンプルコードは、シンプル認証を使用して HA HDFS クラスタにアクセスする方法の例を示しています。
( "username"="user", "password"="passwd", "dfs.nameservices" = "my-ha", "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" )HDFS クラスタの構成は、hdfs-site.xml ファイルに書き込むことができます。ブローカープロセスを使用して HDFS クラスタに関する情報を読み取る場合は、クラスタのファイルパスと認証情報のみを指定する必要があります。
例
テストテーブルを作成します。次のサンプルコードを実行して、tpch データベースに lineitem という名前のテーブルを作成します。
CREATE TABLE lineitem ( l_orderkey bigint, l_partkey bigint, l_suppkey bigint, l_linenumber int, l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, l_linestatus string, l_shipdate date, l_commitdate date, l_receiptdate date, l_shipinstruct string, l_shipmode string, l_comment string ) ENGINE=OLAP DUPLICATE KEY(l_orderkey) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES( "replication_num" = "1" );インポートジョブを作成します。
重要StarRocks のバージョンが 2.5.8 より前の場合は、次のサンプルコードを参照してインポートジョブを作成できます。StarRocks のバージョンが 2.5.8 以降の場合は、次のサンプルコードから
WITH BROKER brokerの部分を削除してください。2.5.8 より前の StarRocks バージョン
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) ) WITH BROKER broker ( "fs.oss.accessKeyId" = "xxx", "fs.oss.accessKeySecret" = "xxx", "fs.oss.endpoint" = "xxx" );2.5.8 以降の StarRocks バージョン
LOAD LABEL tpch.lineitem ( DATA INFILE("oss://xxx/tpc_h/sf1/lineitem.tbl") INTO TABLE `lineitem` COLUMNS TERMINATED BY '|' (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) )インポートジョブの状態をクエリします。
show load where label = 'lineitem'\G; *************************** 1. row *************************** JobId: 1**** Label: lineitem State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=6001215 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2022-04-13 15:07:53 EtlStartTime: 2022-04-13 15:07:56 EtlFinishTime: 2022-04-13 15:07:56 LoadStartTime: 2022-04-13 15:07:56 LoadFinishTime: 2022-04-13 15:08:06 URL: NULL JobDetails: {"Unfinished backends":{"97f1acd1-6e70-4699-9199-b1722020****":[]},"ScannedRows":6001215,"TaskNumber":1,"All backends":{"97f1acd1-6e70-4699-9199-b1722020****":[10002,10003,10004,10005]},"FileNumber":1,"FileSize":753862072} 2 rows in set (0.00 sec)インポートジョブの完了後、ビジネス要件に基づいてデータをクエリします。
lineitem テーブルのデータ行数をクエリします。
select count(*) from lineitem;次の出力が返されます。
+----------+ | count(*) | +----------+ | 6001215 | +----------+ 1 row in set (0.03 sec)lineitem テーブルの最初の 2 行のデータをクエリします。
select * from lineitem limit 2;次の出力が返されます。
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ | 69 | 115209 | 7721 | 1 | 48 | 58761.6 | 0.01 | 0.07 | A | F | 1994-08-17 | 1994-08-11 | 1994-09-08 | NONE | TRUCK | regular epitaphs. carefully even ideas hag | | 69 | 104180 | 9201 | 2 | 32 | 37893.76 | 0.08 | 0.06 | A | F | 1994-08-24 | 1994-08-17 | 1994-08-31 | NONE | REG AIR | s sleep carefully bold, | +------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+--------------------------------------------+ 2 rows in set (0.01 sec)
インポートジョブの同時実行性
インポートジョブは、1 つ以上のタスクで構成されます。タスクは並列に実行されます。インポートジョブは、LOAD ステートメントの data_desc 文字列に基づいてタスクに分割できます。例:
インポートジョブに、異なるデータソースアドレスからのソーステーブルを指定する複数の data_desc 文字列がある場合、インポートジョブはタスクに分割され、各タスクには data_desc 文字列が含まれます。
インポートジョブに、ソーステーブルの異なるパーティションを指定する複数の data_desc 文字列がある場合、インポートジョブもタスクに分割され、各タスクには data_desc 文字列が含まれます。
各タスクには、並列実行のためにバックエンドに均等に分散される 1 つ以上のインスタンスを含めることができます。タスクは、次のフロントエンド構成に基づいてインスタンスに分割されます。
min_bytes_per_broker_scanner:各インスタンスによって処理されるデータの最小量。デフォルトでは、最小データ量は 64 MB です。
max_broker_concurrency:各タスクの並列インスタンスの最大数。デフォルト値:100。
load_parallel_instance_num:各バックエンドの並列インスタンスの数。デフォルト値:1。
インスタンスの総数は、次の値の最小値と等しくなります。インポートされたファイルの合計サイズを min_bytes_per_broker_scanner の値で割った値、max_broker_concurrency の値、load_parallel_instance_num の値にバックエンドの数を掛けた値。
ほとんどの場合、インポートジョブには data_desc 文字列が 1 つしかないため、タスクは 1 つだけ含まれます。タスクはインスタンスに分割されます。インスタンスの数はバックエンドの数と等しくなります。各インスタンスは、並列実行のために異なるバックエンドに割り当てられます。