ストリームロードは同期インポート方式です。この方式では、HTTP リクエストを送信することで、ローカルファイルまたはデータストリームを Doris にインポートできます。ストリームロードはデータを同期的にインポートし、インポート結果を返します。レスポンス本文に基づいて、インポートが成功したかどうかを判断できます。このトピックでは、ストリームロードの基本原則、基本操作、システム構成、およびベストプラクティスについて説明します。
シナリオ
ストリームロードは、主にローカルファイルのインポート、または手順に従ってデータストリームからデータをインポートするのに適しています。
基本原則
次の情報は、ストリームロードのフローチャートを示しています。一部のインポート詳細は提供されていません。
^ +
| |
| | 1A. ユーザーが FE にロードを送信する
| |
| +--v-----------+
| | FE |
5. ユーザーに結果を返す | +--+-----------+
| |
| | 2. BE にリダイレクトする
| |
| +--v-----------+
+---+Coordinator BE| 1B. ユーザーが BE にロードを送信する
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. データを分散する
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+ストリームロードモードでは、Doris はバックエンド(BE)をコーディネーター BE として選択します。コーディネーター BE は、データの受信と他の BE へのデータの分散を担当します。 HTTP リクエストを送信することで、インポートジョブを送信できます。フロントエンド(FE)にリクエストを送信すると、FE は HTTP リダイレクトを実行してリクエストをコーディネーター BE に転送します。リクエストをコーディネーター BE に直接送信することもできます。その後、コーディネーター BE はインポートジョブの結果を返します。
サポートされているデータ形式
ストリームロードは、CSV(テキスト)と JSON の 2 つのデータ形式をサポートしています。
基本操作
インポートジョブの作成
ストリームロードは、HTTP プロトコルを使用してデータを送信および転送します。この例では、curl コマンドを実行してインポートジョブを送信します。他の HTTP クライアントを使用して操作を実行することもできます。
curlコマンドcurl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load // ヘッダーで指定できるパラメーターについては、次の表で説明します。 // 形式は -H "key1:value1" です。HELP STREAM LOADステートメントを実行すると、インポートジョブを作成するための詳細な構文を表示できます。ストリームロードモードでは、インポートジョブに関連するすべてのパラメーターがヘッダーで指定されます。次の表にパラメーターを示します。パラメーター
説明
署名パラメーター
user:passwd
ストリームロードは、HTTP プロトコルを使用してインポートジョブを作成します。基本アクセス認証を使用して、インポートジョブの署名が生成されます。 Doris は、署名に基づいて ID とインポート権限を確認します。
インポートジョブパラメーター
label
インポートジョブの識別子。
各インポートジョブは、単一のデータベース内で一意のラベルを持ちます。インポートコマンドでインポートジョブのカスタムラベルを指定できます。インポートジョブが送信されると、ラベルに基づいてインポートジョブのステータスを表示できます。一意のラベルを使用すると、同じデータが繰り返しインポートされるのを防ぐこともできます。同じバッチのデータには同じラベルを使用することをお勧めします。このようにして、同じバッチのデータに対する繰り返しのリクエストは 1 回だけ受け入れられ、at-most-once セマンティクスが保証されます。ラベルに対応するインポートジョブの状態が CANCELLED の場合、ラベルは再度使用できます。
column_separator
インポートするファイルの列区切り文字。デフォルト値:\t。
印刷できない文字が列区切り文字として使用される場合、区切り文字は 16 進数形式で、\x プレフィックスで始まる必要があります。たとえば、Hive ファイルの列区切り文字が \x01 の場合、このパラメーターを
-H "column_separator:\x01"の形式で指定します。複数の文字の組み合わせを列区切り文字として使用できます。line_delimiter
インポートするファイルの行区切り文字。デフォルト値:\n。複数の文字の組み合わせを行区切り文字として使用できます。
max_filter_ratio
インポートジョブの最大許容率。有効な値:0 ~ 1。デフォルト値:0。
インポートジョブのエラー率が最大許容率を超えると、インポートジョブは失敗します。誤ったデータ行を無視する場合は、このパラメーターを 0 より大きい値に設定して、インポートジョブが成功するようにします。次の式を使用して最大許容率を計算できます。
[dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)] > max_filter_ratio。dpp.abnorm.ALLは、型の不一致、列の不一致、長さの不一致など、さまざまな理由でインポートできないデータ行の数を表します。dpp.norm.ALLは、インポートできるデータ行の数を表します。SHOW LOADステートメントを実行して、インポートジョブによってインポートされたデータ行の数をクエリできます。ソースファイルのデータ行数 = dpp.abnorm.ALL + dpp.norm.ALLwhere
インポートジョブで指定されたフィルター条件。
ストリームロードでは、WHERE 句を指定してソースデータをフィルタリングできます。フィルタリングされたデータはインポートされず、インポートジョブのエラー率の計算にも使用されませんが、インポート結果の NumberUnselectedRows パラメーターの値としてカウントされます。
Partitions
インポートするテーブルのパーティション情報。インポートするデータが指定されたパーティションに属していない場合、データはインポートされません。インポートされないデータ行の数は dpp.abnorm.ALL に含まれます。
columns
インポートするデータの関数変換構成。ストリームロードは、列順の変更と式変換をサポートしています。式変換方法は、クエリステートメントで使用できる方法と同じです。
exec_mem_limit
インポートジョブに割り当てられるメモリの制限。単位:バイト。デフォルトでは、インポートジョブに割り当てられる最大メモリは 2 GB です。
strict_mode
インポートジョブの厳密モードを有効にするかどうかを指定します。デフォルトでは、このモードは無効になっています。
ストリームロードでは、ヘッダーで
strict_modeパラメーターを true に設定することで、厳密モードを有効にできます。厳密モードでは、列型変換後のデータは、次のルールに基づいてインポートプロセスで厳密にフィルタリングされます。厳密モードが有効になっている場合、列型変換後に誤ったデータが除外されます。誤ったデータとは、ソースファイルでは null ではないが、列型変換後に null 値に変換される値を指します。
厳密モードは、null 値が関数によって計算される列には適用されません。
範囲制限を含むインポートされた列型の場合、ソースデータが列型変換後に null 以外の値に変換できるが、変換された値が制限された範囲外である場合、厳密モードはこの型の列には適用されません。たとえば、インポートされた列型が DECIMAL(1,0) で、列のソース値が 10 であるとします。このソース値は列型に基づいて変換できますが、変換された値は列型で指定された範囲外です。厳密モードはこの列には適用されません。
merge_type
データマージのタイプ。デフォルト値:APPEND。有効な値:
APPEND:このバッチのデータを既存のデータに追加します。
DELETE:このバッチのデータと同じキーを持つすべてのデータ行を削除します。
MERGE:DELETE 条件と一緒に使用する必要があります。 DELETE 条件を満たすデータは DELETE セマンティクスに基づいて処理され、DELETE 条件を満たさないデータは APPEND セマンティクスに基づいて処理されます。
two_phase_commit
インポートジョブの 2 段階トランザクションコミットモードを有効にするかどうかを指定します。このモードが有効になっている場合、データが書き込まれた後にインポート結果が返されます。この段階では、データは表示されず、トランザクションは PRECOMMITTED 状態です。データは、手動でコミット操作をトリガーした後にのみ表示されます。デフォルトでは、2 段階トランザクションコミットモードは無効になっています。
2 段階トランザクションコミットモードを有効にするには、
disable_stream_load_2pcbe.conf ファイルのtwo_phase_commitパラメーターを false に設定し、ヘッダーの パラメーターを true に設定します。例:
ストリームロードのプリコミット操作を開始します。
説明列順変更の例:ソーステーブルには src_c1、src_c2、src_c3 の 3 つの列があります。 Doris テーブルにも dst_c1、dst_c2、dst_c3 の 3 つの列があります。
ソーステーブルの src_c1 列が Doris テーブルの dst_c1 列に対応し、ソーステーブルの src_c2 列が Doris テーブルの dst_c2 列に対応し、ソーステーブルの src_c3 列が Doris テーブルの dst_c3 列に対応する場合、columns パラメーターを
columns: dst_c1, dst_c2, dst_c3の形式で指定できます。ソーステーブルの src_c1 列が Doris テーブルの dst_c2 列に対応し、ソーステーブルの src_c2 列が Doris テーブルの dst_c3 列に対応し、ソーステーブルの src_c3 列が Doris テーブルの dst_c1 列に対応する場合、columns パラメーターを
columns: dst_c2, dst_c3, dst_c1の形式で指定できます。式変換の例:ソーステーブルには tmp_c1 と tmp_c2 の 2 つの列があり、Doris テーブルにも c1 と c2 の 2 つの列があります。ただし、ソーステーブルの 2 つの列は、関数によって変換されて Doris テーブルの 2 つの列に対応する必要があります。この場合、columns パラメーターを
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)gの形式で指定できます。tmp_*はプレースホルダーであり、ソースファイルの 2 つの列を表します。
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load { "TxnId": 18036, "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****", "TwoPhaseCommit": "true", "Status": "Success", "Message": "OK", "NumberTotalRows": 100, "NumberLoadedRows": 100, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 1031, "LoadTimeMs": 77, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 1, "ReadDataTimeMs": 0, "WriteDataTimeMs": 58, "CommitAndPublishTimeMs": 0 }トランザクションのコミット操作をトリガーします。
トランザクションの abort 操作をトリガーします。
コマンド例
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load返された結果
ストリームロードは同期インポート方式です。したがって、インポートジョブの結果は、HTTP リクエストへのレスポンスとして直接返されます。例:
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****", "Status": "Success", "ExistingJobStatus": "FINISHED", // オプション "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****" }次の表に、ストリームロードモードでのインポートジョブの結果のパラメーターを示します。
パラメーター
説明
TxnId
インポートジョブのトランザクション ID。トランザクション ID は、Alibaba Cloud によって完全に管理できます。
Label
インポートジョブのラベル。ラベルを指定することも、システムによって自動的に生成することもできます。
Status
インポートジョブの状態。有効な値:
Success:インポートジョブは成功しました。
Publish Timeout:インポートジョブは完了しましたが、データが遅延している可能性があります。再試行する必要はありません。
Label Already Exists:ラベルはすでに存在します。ラベルを変更する必要があります。
Fail:インポートジョブは失敗しました。
ExistingJobStatus
既存のラベルに対応するインポートジョブの状態。このパラメーターは、Status パラメーターの値が Label Already Exists の場合にのみ表示されます。返された値に基づいて、既存のラベルに対応するインポートジョブのステータスを知ることができます。有効な値:
RUNNING:インポートジョブは進行中です。
FINISHED:インポートジョブは成功しました。
Message
インポートジョブに対して返されたエラーメッセージ。
NumberTotalRows
インポートジョブによって処理されたデータ行の総数。
NumberLoadedRows
正常にインポートされたデータ行の数。
NumberFilteredRows
インポートできないデータ行の数。
NumberUnselectedRows
WHERE 条件によって除外されたデータ行の数。
LoadBytes
インポートジョブによってインポートされたバイト数。
LoadTimeMs
インポートジョブの期間。単位:ミリ秒。
BeginTxnTimeMs
FE にトランザクションの開始をリクエストするために消費された時間。単位:ミリ秒。
StreamLoadPutTimeMs
FE にインポートジョブの実行プランの返却をリクエストするために消費された時間。単位:ミリ秒。
ReadDataTimeMs
データの読み取りに消費された時間。単位:ミリ秒。
WriteDataTimeMs
データの書き込みに消費された時間。単位:ミリ秒。
CommitAndPublishTimeMs
FE にトランザクションのコミットと公開をリクエストするために消費された時間。単位:ミリ秒。
ErrorURL
誤ったデータ行を表示するために使用される URL。
重要ストリームロードは同期インポート方式です。したがって、インポート情報は Doris に記録されません。 show load コマンドを実行して、ストリームロードモードのインポートジョブの結果を非同期的に表示することはできません。インポート結果を表示するには、インポートジョブを送信するために送信した HTTP リクエストへのレスポンスを監視する必要があります。
インポートジョブのキャンセル
ストリームロードモードでは、インポートジョブを手動でキャンセルすることはできません。タイムアウトエラーまたはインポートエラーが発生した場合、ストリームロードモードのインポートジョブはシステムによって自動的にキャンセルされます。
ストリームロードモードでのインポートジョブの表示
SHOW STREAM LOAD ステートメントを実行すると、ストリームロードモードで完了したインポートジョブに関する情報をクエリできます。
デフォルトでは、BE はストリームロードモードのインポートジョブに関する情報を記録しません。 BE がそのような情報を記録できるようにするには、BE 構成の enable_stream_load_record パラメーターを true に設定します。詳細については、「バックエンドノードの構成項目」をご参照ください。
関連するシステム構成
FE 構成
stream_load_default_timeout_second:ストリームロードモードでのインポートジョブのタイムアウト期間。単位:秒。指定されたタイムアウト期間内にインポートジョブが完了しない場合、システムはインポートジョブをキャンセルし、インポートジョブの状態は CANCELLED に変更されます。デフォルトのタイムアウト期間は 600 秒です。ソースファイルを指定されたタイムアウト期間内にインポートできないと推定される場合は、ストリームロードモードでインポートジョブを送信するときに HTTP リクエストで個別のタイムアウト期間を設定するか、FE 構成の stream_load_default_timeout_second パラメーターを変更してグローバルデフォルトのタイムアウト期間を指定できます。
BE 構成
streaming_load_max_mb:ストリームロードモードでインポートジョブによってインポートできるデータの最大量。デフォルト値:10240。単位:MB。ソースファイルのサイズがこのしきい値を超える場合は、BE 構成の streaming_load_max_mb パラメーターを変更する必要があります。
ベストプラクティス
シナリオ
ストリームロードは、ソースファイルがメモリまたはディスクに保存されているシナリオに最適です。ストリームロードは同期インポート方式です。同期的にインポート結果を取得する場合にも、このインポート方式を使用できます。
データ量
ストリームロードモードでは、BE がデータの分散とインポートを担当します。したがって、一度に 1 GB ~ 10 GB のデータをインポートすることをお勧めします。デフォルトでは、ストリームロードモードでインポートジョブによってインポートできるデータの最大量は 10 GB です。サイズが 10 GB を超えるファイルをインポートする場合は、BE 構成の streaming_load_max_mb パラメーターを変更する必要があります。
たとえば、インポートするファイルのサイズが 15 GB の場合、BE 構成の streaming_load_max_mb パラメーターを 16000 に設定します。
ストリームロードモードでのインポートジョブのデフォルトのタイムアウト期間は 300 秒です。 Doris の最大インポート速度に基づいて、サイズが 3 GB を超えるファイルをインポートする場合は、デフォルトのタイムアウト期間を変更する必要があります。
インポートジョブのタイムアウト期間 = インポートするデータ量/10 MB/s。クラスターの実際の平均インポート速度は、実際の状況に基づいて計算する必要があります。たとえば、サイズが 10 GB のファイルをインポートし、クラスターのインポート速度が 10 MB/s の場合、インポートジョブのタイムアウト期間は 1,000 秒です。
完全な例
データ情報:ローカルディスクの /home/store-sales ディレクトリから bj-sales データベースの store-sales テーブルに約 15 GB のデータをインポートします。
クラスター情報:同時に処理できるストリームロードモードのインポートジョブの数は、クラスターサイズの影響を受けません。
手順
インポートするファイルのサイズが 10 GB を超えているため、BE 構成の be.conf ファイルを変更します。
streaming_load_max_mb = 16000インポート期間がデフォルトのタイムアウト期間を超えているかどうかを確認します。この場合、インポート期間は 1,500 秒で、次の式に基づいて計算されます。
15,000 MB/10 MB/s = 1,500s。インポート期間がデフォルトのタイムアウト期間を超える場合は、FE 構成の fe.conf ファイルで stream_load_default_timeout_second パラメーターを 1500 に設定します。次のコマンドを実行して、インポートジョブを作成します。
curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load