すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Stream Loadを使用したデータのインポート

最終更新日:Jun 11, 2025

ローカルファイルまたはデータストリームをApsaraDB for SelectDBインスタンスにインポートする場合、Stream Loadを使用してデータを同期的にインポートできます。 このトピックでは、Stream Loadを使用してSelectDBインスタンスにデータをインポートする方法について説明します。

背景情報

Stream Loadは同期データインポート方式です。 このメソッドを使用すると、HTTPリクエストを送信してローカルファイルまたはデータストリームをSelectDBインスタンスにインポートできます。 Stream Loadはデータを同期的にインポートし、すぐにインポート結果を返します。 結果に基づいて、Stream Loadジョブが成功したかどうかを判断できます。 ストリームロードを使用して、CSVJSONParquet、またはOptimized Row Columnar (ORC) 形式でデータをインポートできます。

重要

ストリームロードは、高スループット、低レイテンシ、および高い柔軟性と信頼性を備えています。 推奨するには、Stream Loadを使用してデータをインポートします。

準備

  1. Stream Loadリクエストを開始するターミナルがSelectDBインスタンスに接続されていることを確認します。

    1. SelectDBインスタンスのパブリックエンドポイントを申請します。 詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。

      Stream Loadリクエストを開始する端末がSelectDBインスタンスと同じ仮想プライベートクラウド (VPC) にある場合は、この手順をスキップします。

    2. Stream Loadリクエストが開始されたターミナルの関連IPアドレスを、SelectDBインスタンスのIPアドレスホワイトリストに追加します。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。

    3. SelectDBインスタンスのCIDRブロックをデータソースクラスターのIPアドレスホワイトリストに追加します。 これは、ストリームロード要求が開始される端末でホワイトリストメカニズムがサポートされている場合に適用されます。

      • SelectDBインスタンスが属するVPC内のSelectDBインスタンスのIPアドレスを取得するには、「ApsaraDB SelectDBインスタンスが属するVPC内のIPアドレスを表示する方法」の操作を実行します。

      • SelectDBインスタンスのパブリックIPアドレスを取得するには、pingコマンドを実行してSelectDBインスタンスのパブリックエンドポイントにアクセスし、インスタンスのIPアドレスを取得します。

  2. オプション。 Stream Loadジョブの操作レコードを保持するようにバックエンド (BE) 設定を変更します。

    既定では、Stream Loadジョブの操作レコードはBEでは作成されません。

    Stream Loadジョブの操作レコードをトレースする場合は、Stream Loadジョブを作成する前に、enable_stream_load_recordをtrueに設定する必要があります。 パラメーターを変更するには、チケットを起票する必要があります。

  3. オプション。 BE設定を変更して、Stream Loadを使用してインポートできるファイルの最大サイズを調整します。

デフォルトでは、ストリームロードを使用してインポートできるファイルの最大サイズは1,024 MBです。

Stream Loadを使用してインスタンスにインポートするファイルのサイズが1,024 MBを超える場合は、streaming_load_max_mb BEパラメーターを変更する必要があります。 詳細については、「パラメーターの設定」をご参照ください。

  1. オプション。 フロントエンド (FE) 設定を変更して、インポートのタイムアウト期間を調整します。

    デフォルトでは、Stream Loadジョブのタイムアウト時間は600秒です。 指定したタイムアウト期間内にStream Loadジョブが完了しない場合、システムはStream Loadジョブをキャンセルし、ステータスをCANCELLEDに変更します。

    [Stream Load] ジョブが特定のタイムアウト期間内に完了できない場合は、[Stream Load] リクエストで新しいタイムアウト期間を指定するか、またはstream_load_default_timeout_second FEパラメーターを変更して、すべての [Stream Load] ジョブの新しいデフォルトのタイムアウト期間を指定できます。 FEパラメーターを変更するには、チケットを起票する必要があります。

使用上の注意

Stream Loadを使用して、サイズが数百MBから1 GBの範囲のデータを一度にSelectDBインスタンスに書き込むことができます。 特定のシナリオでは、SelectDBインスタンスに少量のデータを頻繁に書き込むと、インスタンスのパフォーマンスが大幅に低下し、テーブルでデッドロックが発生する可能性があります。 SelectDBインスタンスにデータを書き込む頻度を減らすことを推奨します。 次の方法を使用して、少量のデータをSelectDBインスタンスに同時に書き込むことができます。

  • アプリケーションでのバッチ操作: ビジネスデータを収集し、SelectDBインスタンスにStream Loadリクエストを送信する必要があります。

  • サーバーでのバッチ操作: SelectDBがStream Loadリクエストを受信すると、インスタンスサーバーはリクエストされたデータをバッチ処理します。 詳細については、「グループコミット機能を使用したデータのインポート」をご参照ください。

ストリームロードジョブの作成

Stream Loadは、HTTPプロトコルを介してデータを送信および転送します。 次のコードスニペットは、curlコマンドを実行してStream Loadジョブを送信する方法の例を示しています。 LinuxまたはmacOSオペレーティングシステムの端末で、またはWindowsでコマンドプロンプトを使用してコマンドを実行できます。 他のHTTPクライアントを使用してStream Loadジョブを送信することもできます。

構文

curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

パラメーター

パラメーター

必要

説明

-- location-trusted

必須

認証が必要な場合、リクエストがリダイレクトされるサーバーにユーザー名パスワードを渡すことを指定します。

-u

必須

SelectDBインスタンスへの接続に使用されるユーザー名とパスワード。 関連するフィールド:

  • username

  • password

-H

任意

リクエストヘッダー。 形式: -H "key1:value1"

関連するフィールド:

  • label: Stream Loadジョブの一意の識別子。

  • colume_separator: インポートするファイルの列区切り文字。 デフォルト値: \t

    複数の文字の組み合わせを列の区切り文字として使用することもできます。

    列の区切り文字として非表示文字を指定する場合は、プレフィックスとして \xを追加し、区切り文字を16進数で設定します。

詳細については、「リクエストヘッダーフィールド」をご参照ください。

-T

必須

インポートするファイルのパス。 関連フィールド:

file_path: インポートするファイルのパス。

-XPUT

必須

HTTPリクエストのメソッド。 この場合、PUTメソッドが使用されます。 データをインポートするSelectDBインスタンスのURLを指定する必要があります。 関連するフィールド:

  • host: SelectDBインスタンスのVPCエンドポイントまたはパブリックエンドポイント。

    • パブリックエンドポイント: コマンドを実行するマシンが、SelectDBインスタンスが存在するVPCとは異なるVPCに存在する場合、パブリックエンドポイントを使用する必要があります。 パブリックエンドポイントを申請する方法の詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。

    • VPCエンドポイント: コマンドを実行するマシンがAlibaba Cloudサービスであり、SelectDBインスタンスと同じVPCにある場合、VPCエンドポイントを使用することを推奨します。

  • port: SelectDBインスタンスのHTTPポート。 デフォルト値: 8080

    SelectDBインスタンスのインスタンス詳細ページで、SelectDBインスタンスのエンドポイントとポートを表示できます。

  • db_name: データベースの名前。

  • table_name: テーブルの名前。

リクエストヘッダーフィールド

Stream LoadはHTTPプロトコルを使用します。 したがって、Stream Loadジョブに関連するパラメーターは、主にヘッダーで指定されます。 データのインポートに使用するパラメーターの共通フィールドを次の表に示します。

フィールド

説明

ラベル

Stream Loadジョブの一意の識別子。

使用法: l abel

  • Label使用して、インポートコマンドでStream Loadジョブのカスタムラベルを作成できます。

  • Yのステータスを照会できます。aに基づくストリームロードジョブラベル.

  • 一意のラベルは、同じデータの繰り返しのインポートを防ぎます。

  • labelを持つStream LoadジョブがCANCELLEDの場合、labelは再利用できます。

    重要

    インポートするデータの同じバッチに対して同じラベルを使用することを推奨します。 このようにして、同じバッチのデータをインポートするための繰り返し要求は1回だけ受信されます。 これは、最大1回セマンティクスを実装します。

形式

インポートするデータの形式。

  • 有効な値: CSVJSONPARQUETORCcsv_with_namescsv_with_names_and_types 値をcsv_with_namesに設定した場合、CSVファイルの最初の行をスキップします。 値をcsv_with_names_and_typesに設定した場合、 CSVファイルの最初の2行をスキップします

  • デフォルト値: CSV

フォーマット要件と関連するパラメーターの詳細については、「」をご参照ください。

ファイル形式

line_delimiter

インポートするファイルの行区切り文字。

行区切り文字として複数の文字の組み合わせを使用できます。 たとえば、Windowsは行区切り文字として \r\nを使用します。

column_separator

インポートするファイルの列区切り文字。

複数の文字の組み合わせを列の区切り文字として使用できます。 たとえば、| | を列区切り文字として使用できます。

非表示文字を列区切り文字として指定する場合は、 \xをプレフィックスとして追加し、区切り文字を16進数で設定します。 たとえば、Hiveファイルの列区切り文字が \x01の場合、このパラメーターを次の形式で指定します。 -H "column_separator:\x01"

compress_type

インポートするファイルの圧縮形式。 CSVおよびJSONファイルは圧縮できます。

次の圧縮形式がサポートされています。 . gz,. lzo,. bz2,. lz4,. lzop、および. デフレート.

max_filter_ratio

Stream Loadジョブの最大許容レート。

インポートエラー率が最大許容値を超えた場合、インポートは失敗します。 無効なデータ行を無視する場合は、このパラメーターを0より大きい値に設定して、ジョブが成功するようにします。

  • デフォルト値: 0

  • 有効な値: [0,1] 。

strict_mode

strictモードを有効にするかどうかを指定します。 有効値:

  • false(デフォルト): モードを無効にします。

  • true: モードを有効にします。 厳密モードでは、列型変換後のデータは、インポート中に厳密にフィルタリングされます.

    • 無効なデータは除外されます。

    • 列タイプの変換後、NOT NULL値はNULL値に変換され、除外されます。

cloud_cluster

データのインポートに使用されるクラスター。

デフォルトでは、インスタンスのデフォルトクラスターが使用されます。 インスタンスにデフォルトクラスターがない場合、システムは自動的にアクセス権限を持つクラスターを選択します。

load_to_single_タブレット

対応するパーティションの1つのタブレットにのみデータをインポートするかどうかを指定します。 このパラメーターは、を使用するテーブルにデータをインポートする場合にのみ使用できます。複製キーモデルであり、ランダムなパーティションを含みます。 有効値:

  • false (デフォルト)

  • true

どこで

Stream Loadジョブに指定されているフィルター条件。

Stream Loadを使用すると、WHERE句を指定してソースデータをフィルタリングできます。 除外されたデータはインポートされず、Stream Loadジョブのエラー率の計算に使用されませんが、num_rows_unseestedパラメーターのどこ句を使用します。

パーティション

テーブルにデータをインポートするパーティション。

インポートするデータが指定されたパーティションに属していない場合、データはインポートされません。 インポートされないデータ行の数は、dpp.abnorm.ALLでカウントされます。

dpp.abnorm.ALLは、SelectDBのカウンターメトリックで、データの前処理中に除外される行の数を指定します。 インポート結果で、NumberFilteredRowsdpp.abnorm.ALLを使用して計算された異常行の数を指定します。

インポートするデータの関数変換設定。 Stream Loadは、列の順序の変更と式の変換をサポートします。 式変換構文は、クエリ文で使用する構文と同じです。

merge_type

データマージのタイプ。 有効値:

  • APPEND (デフォルト): 新しいデータを追加します。

  • MERGE: deleteパラメーターと共に使用して、Delete Flag列にラベルを付けます。

  • DELETE: 現在のStream Loadジョブで削除されたすべてのデータをインポートします。

重要

MERGEDELETEは、UNIQUE KEYモデルを使用するテーブルに適しています。

削除

データを削除するための条件。 このパラメーターは、merge_typeパラメーターがmergeに設定されている場合にのみ有効です。

function_column.sequence_col

このパラメーターは、UNIQUE KEYモデルを使用するテーブルでのみ使用でき、同じキー列を持つレコードの指定されたsource_sequence列に基づいて値列に対してREPLACE操作が実行されるようにします。 source_sequence列は、データソースの列またはスキーマ内の既存の列にすることができます。

exec_mem_limit

Stream Loadジョブに割り当てることができるメモリの最大サイズ。

  • 単位:バイト。

  • デフォルト値: 2 GBに等しい2147483648

タイムアウト

Stream Loadジョブのタイムアウト期間。

  • 単位: 秒。

  • デフォルト値: 600

  • 有効な値: [1, 259200] 。

タイムゾーン

Stream Loadジョブに使用されるタイムゾーン。 このパラメーターは、Stream Loadジョブに含まれるすべてのタイムゾーン関連関数の結果に影響します。 詳細については、「IANAデータベースのタイムゾーン」をご参照ください。

デフォルト値: Asia/Shanghai。UTC + 8を指定します。

two_phase_コミット

Stream Loadジョブの2相コミット (2PC) モードを有効にするかどうかを指定します。

false (デフォルト): モードを無効にします。

true: モードを有効にします。 2PCモードを有効にすると、データが書き込まれた直後にインポート結果が返されます。 このフェーズでは、データは不可視であり、トランザクションはPRECOMMITTED状態にある。 データは、コミット操作を手動でトリガーした後にのみ表示されます。

推奨設定:

モードを有効にすると、すべてのStream Loadジョブが成功または失敗します。 追加モードでは、データのインポート中にすべてのデータが表示されないようにします。

次のシナリオでモードを有効にする

  • 金融取引: データの整合性と一貫性を確保する必要があります。

  • 課金システム: すべてのデータをインポートする必要があります。

  • 重要なビジネス: データの精度要件は非常に高いです。

次のシナリオでモードを無効にします。

  • ログ分析: データの整合性要件は比較的低いですが、データを高速でインポートする必要があります。

  • 大量データのバッチ処理: リソースが限られているため、データを迅速にインポートする必要があります。

  • 繰り返しデータインポート: データのインポートに失敗した場合、データを再インポートする必要があります。

jsonpaths

JSONデータのインポートに使用できるメソッド。 有効値:

  • シンプルモード: jsonpathsを指定する必要はありません。 JSONオブジェクトの各キーはテーブルの列に対応し、キーと列は異なる場合があります。 たとえば、JSONオブジェクトのk1k2、およびk3 {"k1":1、"k2":2、"k3":"hello"} は、テーブルの正確な列に対応する必要があります。

  • マッチングモード: JSONデータは比較的複雑です。 複雑なJSONデータを扱う場合、jsonpathsパラメーターを使用して特定のキーを抽出し、テーブル内の対応する列にマップできます。 たとえば、jsonpaths: ["$.status", "$.res.id", "$.res.count"] は、JSONデータからネストされたフィールドを抽出し、それぞれのテーブル列に書き込むことができます。 デフォルトでは、jsonpathsによって抽出されたフィールドは、指定された順序でテーブル列にマップされます。

json_root

JSONデータで子オブジェクトを指定するために使用されるパラメーター。 子オブジェクトは、データの解析とインポートのルートノードとして使用できます。

デフォルト値: "" 。 デフォルト値を使用すると、JSONコンテンツ全体がデータの解析とインポートのルートノードとして使用されます。

strip_outer_array

Stream Loadを使用してJSONデータをインポートするときに、JSONコンテンツ全体を保持するかどうかを指定する重要なパラメーター。

  • false (デフォルト): JSONコンテンツ全体を保持します。 JSON配列全体が単一の値として必要なテーブルにインポートされます。

    たとえば、[{"k1" : 1, "k2" : 2 },{ "k1" : 3, "k2" : 4}] をテーブルにインポートすると、JSONコンテンツ全体がJSON配列として解析され、テーブルにインポートされます。

  • true: JSONコンテンツ全体を保持しません。

    たとえば、[{"k1" : 1, "k2" : 2 },{ "k1" : 3, "k2" : 4}] をテーブルにインポートすると、JSONコンテンツ全体が解析され、テーブルに2つの値としてインポートされます。

read_json_by_line

Stream Loadを使用してJSONデータをインポートするときに、JSONデータの複数の行を含むファイル全体を保持するかどうかを指定する重要なパラメーター。

  • false (デフォルト): ファイル全体を保持し、ファイルを単一のJSON値または配列と見なします。 ファイル全体が単一のJSON値または配列として必要なテーブルにインポートされます。

    例:

    [
     {"id":1, "name":"Alice", "age":25},
     {"id":2, "name":"Bob", "age":30},
     {"id":3, "name":"Charlie", "age":35}
    ]

    デフォルト値を使用すると、ファイル全体が単一のJSON配列として解析されます。

  • true: ファイル全体を保持しません。 ファイルの各行はJSONオブジェクトとして解析され、必要なテーブルにインポートされます。

    例:

    {"id":1, "name":"Alice", "age":25}
    {"id":2, "name":"Bob", "age":30}
    {"id":3, "name":"Charlie", "age":35}

    ファイルの各行はJSONオブジェクトとして解析されます。

VPCエンドポイントがselectdb-cn-h033cjs **** -fe.selectdbfe.pre.rds.aliyuncs.comであるインスタンスのtest_dbデータベースのtest_tableにdata.csvファイルをインポートできます。 次のサンプルコードは、Stream Loadを使用してデータをインポートする方法を示しています。 完全なコードの詳細については、「インポートデータの完全な例」をご参照ください。

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

レスポンスの説明

Stream Loadは、データをインポートするための同期方式です。 したがって、Stream Loadジョブの結果は、HTTPリクエストへの応答として直接返されます。 次のサンプルコードは、サンプル応答を提供します。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

次の表に、レスポンスのパラメーターを示します。

パラメーター

説明

TxnId

Stream LoadジョブのトランザクションID。

ラベル

ラベル [Stream Load] ジョブ

カスタムラベルを作成するか、自動的に生成されるラベルを使用できます。

ステータス

Stream Loadジョブの状態。 有効値:

  • Success: ジョブは成功です。

  • Publish Timeout: ジョブは完了しましたが、遅延後にデータが表示される場合があります。 ジョブを再試行する必要はありません。

  • Label Already Exists: ラベルは既に存在します。 ラベルを変更する必要があります。

  • Fail: ジョブは失敗します。

ExistingJobStatus

既存のラベルに関連付けられているStream Loadジョブの状態ラベル

このパラメーターは、Statusパラメーターの値がLabel Already Existsの場合にのみ表示されます。

このパラメーターの値に基づいて、既存のラベルに関連付けられているStream Loadジョブのステータスを取得できます。 有効値:

  • 実行中: ジョブは進行中です。

  • 終了: ジョブは成功しました。

メッセージ

Stream Loadジョブに対して返されるエラーメッセージ。

NumberTotalRows

Stream Loadジョブによって処理されるデータ行の総数。

NumberLoadedRows

インポートされるデータ行の数。

NumberFilteredRows

インポートできないデータ行の数。

NumberUnselectedRows

WHERE句によって除外されるデータ行の数。

LoadBytes

Stream Loadジョブによってインポートされるバイト数。

LoadTimeMs

Stream Loadジョブの期間。

単位: ミリ秒。

BeginTxnTimeMs

フロントエンド (FE) にトランザクションの開始を要求するために消費される時間。 単位: ミリ秒。

StreamLoadPutTimeMs

FEがStream Loadジョブの実行プランを返すように要求するのにかかる時間。

単位: ミリ秒。

ReadDataTimeMs

データの読み取りにかかる時間。 単位: ミリ秒。

WriteDataTimeMs

データの書き込みにかかる時間。 単位: ミリ秒。

CommitAndPublishTimeMs

トランザクションのコミットと発行をFEに要求するのにかかる時間。

単位: ミリ秒。

エラーURL

エラーデータ行を表示できるURL。

Stream Loadジョブのキャンセル

ジョブの作成後、Stream Loadジョブを手動でキャンセルすることはできません。 タイムアウトエラーまたはインポートエラーが発生すると、Stream Loadジョブはシステムによって自動的にキャンセルされます。 レスポンスでErrorURLを使用して、エラー情報をダウンロードし、問題のトラブルシューティングを行うことができます。

ストリームロードジョブの表示

Stream Loadジョブを作成する前にenable_stream_load_recordをtrueに設定した場合、MySQLクライアントを使用してSelectDBインスタンスに接続し、SHOW STREAM LOADステートメントを実行して、完了したStream Loadジョブを表示します。 デフォルトでは、バックエンド (BE) はStream Loadジョブに関する情報を記録しません。

インポートデータの完全な例

準備

データをインポートする前に、このトピックで説明されている準備が行われていることを確認してください。

CSVデータのインポート

スクリプトを使用したCSVデータファイルのインポート

  • データをインポートするテーブルを作成します。

    1. SelectDBインスタンスに接続します。 詳細については、「DMSを使用したApsaraDB For SelectDBインスタンスへの接続」をご参照ください。

    2. 次のステートメントを実行して、データベースを作成します。

      CREATE DATABASE test_db;
    3. 次のステートメントを実行してテーブルを作成します。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int,
          address varchar(50),
          url varchar(500)
      )
      UNIQUE KEY(`id`, `name`)
      DISTRIBUTED BY HASH(id) BUCKETS 16
      PROPERTIES("replication_num" = "1");
  • Stream Loadジョブを開始するターミナルにインポートするファイルを作成します。 たとえば、ファイル名はtest.csvです。

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  • データをインポートします。

    マシンでコマンドラインツールを開き、curlコマンドを実行して、データインポート用のStream Loadジョブを開始します。

    データインポートジョブの作成に使用される構文とパラメーターの詳細については、「ストリームロードジョブの作成」をご参照ください。 データをインポートする方法の例を次に示します。

    • labelフィールドを使用してデータレコードを重複排除し、タイムアウト期間を指定します。

      からデータをインポートするtest.csvファイルをtest_tableのテーブルtest_db ラベルを使用して、インポートされたデータレコードを重複排除し、タイムアウト期間を100秒に設定します。

       curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • labelフィールドを使用してデータレコードを重複排除し、列を使用してファイルからインポートするデータをフィルタリングします。

      からデータをインポートするtest.csvファイルをtest_tableのテーブルtest_db ラベルを使用して、インポートされたデータレコードを重複排除し、アドレス列の値がhangzhouであるデータのみをインポートするためにファイル内の列名を指定します。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
  • 最大公差率を20% に設定します。

    からデータをインポートするtest.csvファイルをtest_tableのテーブルtest_db 最大公差率を20% に設定します。

    curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
  • strictモードを使用し、タイムゾーンを指定します。

    インポートしたデータをstrictモードでフィルターし、タイムゾーンをAfrica/Abidjanに設定します。

    curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
  • [] SelectDB インスタンスのデータを削除します。

    では、SelectDBインスタンスからインポートするデータと同じデータを削除します。test.csvファイルを作成します。

    curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
  • 関連条件に基づいてインポートする必要のないデータを削除し、残りのデータを SelectDBインスタンスにインポートします。

    インポートするデータからaddress列の値がhangzhouである行を削除し、残りのdat aSelectDBインスタンスにインポートします。

    curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

Javaコードを使用したCSVデータファイルのインポート

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. Configure the parameters.
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. Construct an HTTP client. Take note that the redirection feature must be enabled for the client.
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // Enable the redirection feature.
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. Construct an HTTP PUT request object.
        HttpPut httpPut = new HttpPut(loadUrl);

        // Configure request headers.
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. Specify the data to be imported. In this example, a CSV file is specified.
        // For example, you want to import data from a table that contains the following fields.
        // field1,field2,field3,field4
        // The CSV file contains the following three records. By default, the row delimiter is \n and the column delimiter is \t for a CSV file.
        String data =
                "1\t2\t3\t4\n" +
                "11\t22\t33\t44\n" +
                "111\t222\t333\t444";

        httpPut.setEntity(new StringEntity(data));

        // 5. Send the request and process the returned result.
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // Serialize the returned result by using an appropriate JSON serialization component.
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // Obtain the status code returned by ApsaraDB for SelectDB.
            String dorisStatus = respAsMap.get("Status");
            // If ApsaraDB for SelectDB returns the following status data, the data is imported.
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

JOSNデータのインポート

  1. データをインポートするテーブルを作成します。

    1. SelectDBインスタンスに接続します。 詳細については、「DMSを使用したApsaraDB For SelectDBインスタンスへの接続」をご参照ください。

    2. 次のステートメントを実行して、データベースを作成します。

      CREATE DATABASE test_db;
    3. 次のステートメントを実行してテーブルを作成します。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int
      )
      UNIQUE KEY(`id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 16
      PROPERTIES("replication_num" = "1");

  2. データをインポートする。

    非配列データのインポート

    1. Stream Loadジョブを開始する端末にjson.dataファイルを作成します。 ファイルには複数の行が含まれ、それぞれにJSONデータレコードが含まれます。 例:

      {"id":1,"name":"Emily","age":25}
      {"id":2,"name":"Benjamin","age":35}
      {"id":3,"name":"Olivia","age":28}
      {"id":4,"name":"Alexander","age":60}
      {"id":5,"name":"Ava","age":17}
    2. ファイルをインポートします。

      ターミナルを開き、curlコマンドを実行してStream Loadジョブを開始し、json.dataファイル内のデータをtest_dbデータベースのtest_tableテーブルにインポートします。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

    配列データのインポート

    1. Stream Loadジョブを開始する端末にjson_array.dataファイルを作成します。

      [
      {"userid":1,"username":"Emily","userage":25},
      {"userid":2,"username":"Benjamin","userage":35},
      {"userid":3,"username":"Olivia","userage":28},
      {"userid":4,"username":"Alexander","userage":60},
      {"userid":5,"username":"Ava","userage":17}
      ]
    2. ファイルをインポートします。

      ターミナルを開き、curlコマンドを実行してStream Loadジョブを開始し、json_array.dataファイルのデータをtest_dbデータベースのtest_tableテーブルにインポートします。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

HTTPストリームモード

Stream Loadを使用すると、テーブル値関数 (TVF) を使用して、SQLステートメントでリクエストパラメーターを指定できます。 このTVFはhttp_streamという名前です。 TVFの使用方法の詳細については、「ファイルの分析」をご参照ください。

HTTPストリームモードのStream LoadのRESTful API URLは、通常モードのStream LoadのURLとは異なります。

  • 通常モードでのStream LoadのURL: http:// host:http_port/api/{db}/{table}/_stream_load

  • HTTPストリームモードでのストリームロードのURL: http:// host:http_port/api/_http_Stream

構文

HTTP StreamモードでStream Loadジョブを送信するには、次のコマンドを実行します。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

HTTPストリームモードでのStream Loadジョブのパラメーターの詳細については、このトピックの「パラメーター」をご参照ください。

load_sqlパラメーターを使用して、HTTPヘッダーのcolumn_separatorline_delimiterwhere、およびcolumnsパラメーターを置き換えることができます。 次のサンプルコードは、load_sqlパラメーターで指定されたSQL文を示しています。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完全なサンプルコマンド:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream

FAQ

データインポート中にget table cloud commit lock timeoutエラーメッセージが表示された場合はどうすればよいですか?

SelectDBインスタンスにデータを頻繁に書き込みます。 その結果、デッドロックが発生する。 SelectDBインスタンスにデータを書き込む頻度を減らすことを推奨します。 少量のデータを一度にSelectDBインスタンスに書き込むことができます。 Stream Loadを使用して、数百MBから1 GBのサイズのデータを一度にSelectDBインスタンスに書き込むことができます。

を使用してインポートしたいCVSファイルのデータがストリームロード指定された行区切り文字と列区切り文字?

別の行区切り文字と列区切り文字を指定し、データファイルを変更して、インポートするデータに新しい行区切り文字と列区切り文字が含まれないようにする必要があります。 このようにして、データを適切に解析することができる。 例:

インポートするデータに行区切り文字を指定

インポートするデータに指定された行区切り文字が含まれている場合は、行区切り文字を変更する必要があります。 たとえば、インポートするデータにデフォルトの行区切り文字 \nが含まれている場合、別の行区切り文字を指定する必要があります。

サンプルファイル:

ZHANG San\n, 25, Shaanxi
LI Si\n, 30, Beijing

このファイルでは、 \nは行の区切り文字ではなく、インポートするデータを示します。 ただし、ファイルのデフォルトの行区切り文字も \nです。 ファイルを適切に解析する場合は、line_delimiterを使用して別の行の区切り文字を指定し、ファイルの各データ行の末尾に新しい行の区切り文字が表示されるようにする必要があります。 例:

  1. 別の行区切り文字を指定します。

    たとえば、デフォルトの 行区切り文字\n \r\nを設定する必要があります-H "line_delimiter:\r\n" のためデータをインポートします。

  2. ファイルの各データ行の末尾に新しい行区切り文字を追加します。 上記のサンプルファイルを次のファイルに変更します。

    ZHANG San\n, 25, Shaanxi\r\n
    LI Si\n, 30, Beijing\r\n

インポートするデータに列区切り文字を指定

インポートするデータに列区切り文字が指定されている場合は、列の区切り文字を変更する必要があります。 たとえば、インポートするデータにデフォルトの列区切り文字 \tが含まれている場合、別の列区切り文字を指定する必要があります。

サンプルファイル:

ZHANG San\t, 25 Shaanxi
LI Si\t, 30 Beijing

このファイルでは、 \t列区切り文字ではなくインポートするデータを示します。 ただし、ファイルのデフォルトの列区切り文字も \tです。 ファイルを正しく解析する場合は、colume_separatorを使用して別の列区切り文字を指定し、ファイルの各データ列に新しい列の区切り文字が表示されるようにする必要があります。 例:

  1. 別の列区切り文字を指定します。

    たとえば、既定の列区切り文字 \tをコンマ (,) に置き換える場合は -H "column_separator:," For data import.

  2. ファイルの各データ列に新しい列区切り文字を追加します。 上記のサンプルファイルを次のファイルに変更します。

    ZHANG San\t, 25, Shaanxi
    LI Si\t, 30, Beijing