すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Stream Load

最終更新日:Nov 09, 2025

Stream Load を使用して、ローカルファイルまたはデータストリームを ApsaraDB for SelectDB インスタンスにインポートできます。このトピックでは、Stream Load を使用して ApsaraDB for SelectDB にデータをインポートする方法について説明します。

背景情報

Stream Load は同期データインポートメソッドです。HTTP リクエストを送信して、ローカルファイルまたはデータストリームを ApsaraDB for SelectDB にインポートできます。 Stream Load はインポート結果をすぐに返します。リクエストの戻り値を確認して、インポートが成功したかどうかを判断できます。Stream LoadCSV (テキスト)、JSONPARQUET、および ORC データ形式をサポートします。

重要

Stream Load は、高いスループット、低いレイテンシーを提供し、柔軟で信頼性があります。強く推奨 される主要なデータインポート方法として Stream Load を使用してください。

準備

  1. Stream Load リクエストを送信するために使用されるターミナルが、ネットワーク経由で SelectDB インスタンスに接続できることを確認します:

    1. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請と解放」をご参照ください。

      Stream Load リクエストを送信するターミナルが ApsaraDB for SelectDB インスタンスと同じ VPC にある場合、このステップはスキップできます。

    2. Stream Load リクエストを送信するターミナルの IP アドレスを ApsaraDB for SelectDB インスタンスのホワイトリストに追加します。詳細については、「ホワイトリストの設定」をご参照ください。

    3. Stream Load リクエストを送信するターミナルにホワイトリストが設定されている場合は、SelectDB インスタンスの IP アドレス範囲をそのホワイトリストに追加します。

  2. オプション: 計算クラスター (バックエンド) の設定を変更して、Stream Load 操作レコードを有効にします。

    デフォルトでは、計算クラスターは Stream Load 操作を記録しません。

    Stream Load 操作を追跡するには、インポートタスクを作成する前に enable_stream_load_record を true に設定し、クラスターを再起動します。この機能を有効にするには、テクニカルサポートにチケットを送信する必要があります。

  3. オプション: 計算クラスターの設定を変更して、Stream Load の最大インポートサイズを調整します。

    Stream Load を使用してインポートできるファイルのデフォルトの最大サイズは 10,240 MB です。

    ソースファイルがこのサイズを超える場合は、バックエンドパラメーター streaming_load_max_mb を調整できます。パラメーターの変更方法の詳細については、「パラメーターの設定」をご参照ください。

  4. オプション: フロントエンド (FE) の設定を変更して、インポートのタイムアウトを調整します。

    Stream Load タスクのデフォルトのタイムアウトは 600 秒です。インポートタスクが指定されたタイムアウト内に完了しない場合、システムはタスクをキャンセルし、そのステータスを CANCELLED に変更します。

    ソースファイルが時間制限内にインポートできない場合は、Stream Load リクエストで特定のタイムアウトを設定するか、FE パラメーター stream_load_default_timeout_second を調整してインスタンスを再起動し、グローバルなデフォルトタイムアウトを設定できます。この調整を行うには、テクニカルサポートにチケットを送信する必要があります。

使用上の注意

1 回の Stream Load で数百 MB から 1 GB のデータを書き込むことができます。一部のビジネスシナリオでは、少量のデータを頻繁に書き込むと、インスタンスのパフォーマンスが大幅に低下し、テーブルのデッドロックを引き起こすことさえあります。書き込み頻度を減らし、データのバッチ処理を使用することを強くお勧めします。

  • アプリケーション側のバッチ処理: アプリケーション側でビジネスデータを収集し、Stream Load リクエストを SelectDB に送信します。

  • サーバー側のバッチ処理: SelectDB が Stream Load リクエストを受信した後、サーバーはデータに対してバッチ処理を実行します。詳細については、「グループコミット」をご参照ください。

インポートタスクの作成

Stream Load は HTTP プロトコルを介してデータを送信および転送します。次の例は、curl コマンドを使用してインポートタスクを送信する方法を示しています。このコマンドは、Linux または macOS ターミナル、または Windows コマンドプロンプトで実行できます。Stream Load 操作には他の HTTP クライアントを使用することもできます。

構文

curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load

パラメーター

パラメーター

必須

説明

--location-trusted

はい

認証が必要な場合、これはリクエストがリダイレクトされるサーバーに usernamepassword を渡します。

-u

はい

SelectDB インスタンスのユーザー名とパスワードを指定します。

  • username: ユーザー名。

  • password: パスワード。

-H

いいえ

この Stream Load インポートリクエストのリクエストヘッダー (Header) の内容を指定します。形式は次のとおりです:

-H "key1:value1"

一般的なパラメーターは次のとおりです:

  • label: インポートタスクの一意の ID。

  • column_separator: インポートファイル内の列区切り文字を指定します。デフォルトは \t です。

    複数の文字の組み合わせを列区切り文字として使用することもできます。

    非表示文字の場合は、プレフィックスとして \x を追加し、16 進数で区切り文字を表す必要があります。

リクエストヘッダーパラメーターの詳細については、「リクエストヘッダーパラメーター」をご参照ください。

-T

はい

インポートするデータファイルのパスを指定します。

file_path: オブジェクトファイルのパス。

-XPUT

はい

HTTP リクエストのメソッドです。PUT リクエストメソッドを使用して、SelectDB のデータインポートアドレスを指定します。具体的なパラメーターは次のとおりです:

  • host: SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイント。

    • 同じ VPC にない場合はパブリックエンドポイントを使用する: コマンドを実行するデバイスがターゲットの SelectDB インスタンスと同じ VPC にない場合は、パブリックエンドポイントを使用する必要があります。パブリックエンドポイントを申請するには、「パブリックエンドポイントの申請と解放」をご参照ください。

    • 同じ VPC にある場合は VPC エンドポイントを使用する: コマンドを実行するデバイスが Alibaba Cloud 製品であり、ターゲットの SelectDB インスタンスと同じ VPC にある場合は、VPC エンドポイントを使用することをお勧めします。

  • port: SelectDB インスタンスの HTTP ポート番号。デフォルトは 8080 です。

    SelectDB コンソールのインスタンスの製品ページで、インスタンスのエンドポイントとポート番号を表示できます。

  • db_name: データベース名。

  • table_name: テーブル名。

リクエストヘッダーパラメーター

Stream Load は HTTP プロトコルを使用します。したがって、インポートタスクに関連するパラメーターはリクエストヘッダーで設定されます。次の表に、一般的なインポートパラメーターを示します。

パラメーター

説明

label

インポートタスクの一意の ID。

Label の目的:

  • インポートコマンド内のカスタム名。

  • 対応するインポートタスクの実行ステータスを表示するために使用できます。

  • 同じデータを繰り返しインポートすることを防ぐために使用できます。

  • 対応するインポートジョブのステータスが CANCELLED の場合、再利用できます。

重要

同じバッチのデータには同じ Label を使用することをお勧めします。これにより、同じバッチのデータに対する繰り返しリクエストは一度しか受け入れられず、At-Most-Once 配信が保証されます。

format

インポートデータ形式を指定します。

  • サポートされている形式: CSVJSONPARQUETORCcsv_with_names (CSV ファイルの最初の行をスキップ)、および csv_with_names_and_types (CSV ファイルの最初の 2 行をスキップ)。

  • デフォルト形式: CSV

ファイル形式の要件と関連パラメーターの詳細については、「ファイル形式」をご参照ください。

line_delimiter

インポートファイル内の行区切り文字を指定します。

複数の文字の組み合わせを行区切り文字として使用することもできます。たとえば、Windows システムでは、行区切り文字として \r\n を使用します。

column_separator

インポートファイル内の列区切り文字を指定します。

複数の文字の組み合わせを列区切り文字として使用することもできます。たとえば、二重縦線 || を列区切り文字として使用できます。

非表示文字の場合は、プレフィックスとして \x を追加し、16 進数で区切り文字を表す必要があります。たとえば、Hive ファイルの区切り文字 \x01 の場合は、-H"column_separator:\x01" を指定する必要があります。

compress_type

ファイルの圧縮形式を指定します。圧縮は CSV および JSON ファイルでのみサポートされています。

サポートされている圧縮形式: gzlzobz2lz4lzop、および deflate

max_filter_ratio

インポートタスクの最大許容エラー率を指定します。

インポートエラー率がこのしきい値を超えると、インポートは失敗します。エラー行を無視するには、インポートが成功するように、このパラメーターを 0 より大きい値に設定する必要があります。

  • デフォルト値: 0、これはゼロ許容を意味します。

  • 値の範囲: [0, 1]。

strict_mode

厳格モードを有効にするかどうかを指定します。

  • false (デフォルト): 厳格モードを有効にしません。

  • true: 厳格モードを有効にします。有効にすると、インポートプロセス中に列の型変換に厳格なフィルタリングが適用されます。

    • 不正なデータはフィルターされます。

    • null でないソースデータの列の型変換が NULL 値になる場合も、フィルターされます。

cloud_cluster

インポートに使用するクラスターを指定します。

デフォルトでは、インスタンスのデフォルトクラスターが使用されます。インスタンスにデフォルトクラスターが設定されていない場合、権限を持つクラスターが自動的に選択されます。

load_to_single_tablet

対応するパーティションの 1 つのタブレットにのみデータをインポートするかどうかを指定します。このパラメーターは、ランダムバケットを持つ Duplicate Key テーブルにデータをインポートする場合にのみ設定できます。

  • false (デフォルト): ランダムバケットを持つ Duplicate Key モデルテーブルにデータをインポートする場合、データは対応するパーティションの 1 つのバケットにのみ書き込まれません。

  • true: ランダムバケットを持つ Duplicate Key モデルテーブルにデータをインポートする場合、データは対応するパーティションの 1 つのバケットにのみ書き込まれます。これにより、データインポートの同時実行性とスループットが向上します。

where

インポートタスクのフィルター条件を指定します。

where 文を指定してソースデータをフィルターできます。フィルターされたデータはインポートされず、フィルター率の計算にも含まれません。ただし、where 条件によってフィルターされた行数としてカウントされます。

num_rows_unselected

partitions

インポートするデータのパーティション情報を指定します。

インポートするデータが指定されたパーティションに属していない場合、インポートされません。これらのデータ行は dpp.abnorm.ALL でカウントされます。

dpp.abnorm.ALLSelectDB のカウンターメトリックです。データ前処理段階でフィルターされた行の総数を表します。インポート結果の NumberFilteredRows には、dpp.abnorm.ALL によってカウントされた異常な行の数が含まれます。

columns

インポートするデータの関数変換設定を指定します。

サポートされている関数変換メソッドには、列の順序変更と式変換が含まれます。式変換のメソッドは、検索文と同じです。

merge_type

データマージタイプを指定します。

  • APPEND (デフォルト): このインポートが通常の追加書き込み操作であることを示します。

  • MERGE: delete パラメーターと共に使用して、Delete Flag 列をマークする必要があります。

  • DELETE: このインポートのすべてのデータが削除操作であることを示します。

重要

MERGE および DELETE タイプは、Unique Key モデルにのみ適用されます。

delete

これは、merge_typeMERGE に設定されている場合にのみ意味があります。データを削除する条件を指定します。

function_column.sequence_col

これは Unique Key モデルにのみ適用されます。同じキー列に対して、source_sequence 列に従って値列が置き換えられることを保証します。source_sequence は、データソースの列またはテーブルスキーマの列にすることができます。

exec_mem_limit

インポートメモリ制限を指定します。

  • 単位: バイト。

  • デフォルト値: 2147483648、これは 2 GiB です。

timeout

インポートのタイムアウトを指定します。

  • 単位: 秒。

  • デフォルト値: 600

  • 範囲: [1, 259200]。

timezone

このインポートに使用するタイムゾーンを指定します。このパラメーターは、インポートに関与するすべてのタイムゾーン関連関数の結果に影響します。タイムゾーンの詳細については、「IANA タイムゾーンデータベース」をご参照ください。

デフォルト値: Asia/Shanghai、これは UTC+8 です。

two_phase_commit

2 フェーズコミットモードを有効にするかどうかを指定します。

  • false (デフォルト): 2 フェーズコミットを有効にしません。

  • true: 2 フェーズコミットを有効にします。有効にすると、データ書き込みが完了するとすぐに情報がユーザーに返されます。この時点では、データは表示されず、トランザクションステータスは PRECOMMITTED です。データは、ユーザーが手動でコミット操作をトリガーした後にのみ表示されます。

  • 有効/無効にするタイミング:

    有効にすると、データインポートはアトミックになります。完全に成功するか、完全に失敗するかのいずれかです。これにより、インポートプロセス中に一部のデータが表示される状況も防ぎます。

    これらのシナリオで有効にします:

    • 金融取引データ: データ整合性と一貫性の厳格な保証が必要です。

    • 請求システムデータ: 部分的なデータインポートは許可されません。

    • 重要なビジネスデータ: データ精度要件が非常に高いシナリオ。

    これらのシナリオで無効にします:

    • ログ分析: 一貫性要件は高くなく、インポート速度が優先されます。

    • 大規模なバッチ処理: リソースが限られており、インポートを迅速に完了する必要があります。

    • 再インポート可能なデータ: インポートが失敗した場合に再インポートできるデータ。

jsonpaths

JSON 形式でデータをインポートするには 2 つの方法があります:

  • 基本モード: jsonpaths を指定する必要はありません。JSON データのキーはテーブルの列名と 1 対 1 で対応している必要がありますが、順序は異なっていてもかまいません。たとえば、JSON データ {"k1":1, "k2":2, "k3":"hello"} では、k1、k2、k3 がテーブルの列名に対応します。

  • マッチングモード: JSON データが複雑な場合、パラメーター jsonpaths を使用してキーを対応するテーブル列にマッチングさせます。たとえば、jsonpaths:["$.status", "$.res.id", "$.res.count"] は JSON データからネストされたフィールドを抽出し、対応するテーブルに書き込むことができます。デフォルトでは、jsonpaths によって抽出されたフィールドは、順序どおりにテーブル列にマッピングされます。

json_root

json_root パラメーターを使用して、JSON データ内の子オブジェクトを解析のルートノードとして指定できます。

デフォルト値は "" で、JSON オブジェクト全体がルートノードとして選択されることを意味します。

read_json_by_line

JSON データを処理するための Stream Load の重要なパラメーターです。複数行の JSON データを含む入力ファイルをどのように解析するかを制御します。

  • false (デフォルト): 入力ファイル全体が単一の JSON 値または配列として扱われます。カーネルはファイル全体の内容を 1 つの JSON オブジェクトまたは配列として解析しようとします。

    たとえば、ファイルに次の内容が含まれている場合:

    [
     {"id":1, "name":"Alice", "age":25},
     {"id":2, "name":"Bob", "age":30},
     {"id":3, "name":"Charlie", "age":35}
    ]

    ファイル全体の内容が単一の JSON 配列として解析されます。

  • true: インポートされたデータの各行が JSON オブジェクトです。

    たとえば、ファイルに次の内容が含まれている場合:

    {"id":1, "name":"Alice", "age":25}
    {"id":2, "name":"Bob", "age":30}
    {"id":3, "name":"Charlie", "age":35}

    ファイル内の各行が JSON オブジェクトとして解析されます。

strip_outer_array

JSON データを処理するための Stream Load の重要なパラメーターです。外側の配列を含む JSON データをどのように解析するかを制御します。

  • false (デフォルト): JSON データの元の構造が保持されます。外側の配列は削除されません。JSON 配列全体が単一のレコードとしてインポートされます。

    たとえば、サンプルデータ [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}] の場合、strip_outer_array が false に設定されていると、単一の配列として解析され、テーブルにインポートされます。

  • true: インポートされたデータが JSON 配列の場合、strip_outer_array を true に設定する必要があります。

    たとえば、サンプルデータ [{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}] の場合、strip_outer_array が true に設定されていると、2 つのデータレコードとして解析され、テーブルにインポートされます。

重要

JSON 形式でデータをインポートする場合、非配列形式のパフォーマンスは配列形式よりも大幅に高くなります。

この例は、CSV ファイル data.csv を test_db データベースの test_table テーブルにインポートする方法を示しています。インスタンスの VPC エンドポイントは selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com です。これは単なるサンプル curl コマンドです。完全な例については、「完全なデータインポートの例」をご参照ください。

curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

戻り値

Stream Load は同期インポートメソッドです。インポート結果は作成リクエストへの応答で直接返されます。次のコードブロックは、戻り値のサンプルを示しています。

{
    "TxnId": 17,
    "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 5,
    "NumberLoadedRows": 5,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 28,
    "LoadTimeMs": 27,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 3,
    "CommitAndPublishTimeMs": 18
}

次の表に、戻り値のパラメーターを示します。

パラメーター

説明

TxnId

インポートのトランザクション ID。

Label

インポート ID

カスタム ID を指定するか、システムによって生成させることができます。

Status

インポートステータス。有効な値は次のとおりです:

  • Success: インポートは成功しました。

  • Publish Timeout: インポートタスクは完了しましたが、データは遅れて表示される場合があります。リトライしないでください。

  • Label Already Exists: Label は重複しています。Label を変更する必要があります。

  • Fail: インポートは失敗しました。

ExistingJobStatus

既存の Label に対応するインポートジョブのステータス。

このフィールドは、ステータスが Label Already Exists の場合にのみ表示されます。

このステータスを使用して、既存の Label に対応するインポートタスクの状態を判断できます。

  • RUNNING: タスクはまだ実行中です。

  • FINISHED: タスクは成功しました。

Message

エラーメッセージ。

NumberTotalRows

処理された行の総数。

NumberLoadedRows

正常にインポートされた行数。

NumberFilteredRows

データ品質が不適格な行数。

NumberUnselectedRows

where 条件によってフィルターされた行数。

LoadBytes

インポートされたバイト数。

LoadTimeMs

インポートにかかった時間。

単位: ミリ秒。

BeginTxnTimeMs

FE にトランザクションの開始をリクエストするのにかかった時間。

単位: ミリ秒。

StreamLoadPutTimeMs

FE にインポートデータ実行計画の取得をリクエストするのにかかった時間。

単位: ミリ秒。

ReadDataTimeMs

データの読み取りにかかった時間。

単位: ミリ秒。

WriteDataTimeMs

データ書き込み操作の実行にかかった時間。

単位: ミリ秒。

CommitAndPublishTimeMs

FE にトランザクションのコミットと公開をリクエストするのにかかった時間。

単位: ミリ秒。

ErrorURL

データ品質に問題がある場合は、この URL にアクセスして特定のエラー行を表示できます。

インポートタスクのキャンセル

Stream Load タスクは作成後に手動でキャンセルすることはできません。システムは、タイムアウトが発生した場合またはインポートエラーが報告された場合にのみタスクを自動的にキャンセルします。戻り値の errorUrl を使用してエラーメッセージをダウンロードし、問題をトラブルシューティングできます。

Stream Load タスクの表示

Stream Load 操作レコードを有効にしている場合、MySQL クライアントを使用して ApsaraDB for SelectDB インスタンスに接続し、show stream load 文を実行して完了した Stream Load タスクを表示できます。

完全なデータインポートの例

準備

インポート操作を開始する前に、準備を完了してください。

CSV データのインポート

例: スクリプトを使用したインポート

  1. データの宛先テーブルを作成します。

    1. SelectDB インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB for SelectDB インスタンスに接続する」をご参照ください。

    2. 次の文を実行してデータベースを作成します。

      CREATE DATABASE test_db;
    3. 次の文を実行してテーブルを作成します。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int,
          address varchar(50),
          url varchar(500)
      )
      UNIQUE KEY(`id`, `name`)
      DISTRIBUTED BY HASH(id) BUCKETS 16
      PROPERTIES("replication_num" = "1");
  2. Stream Load ターミナルが配置されているデバイスで、インポートする test.csv という名前のファイルを作成します。

    1,yang,32,shanghai,http://example.com
    2,wang,22,beijing,http://example.com
    3,xiao,23,shenzhen,http://example.com
    4,jess,45,hangzhou,http://example.com
    5,jack,14,shanghai,http://example.com
    6,tomy,25,hangzhou,http://example.com
    7,lucy,45,shanghai,http://example.com
    8,tengyin,26,shanghai,http://example.com
    9,wangli,27,shenzhen,http://example.com
    10,xiaohua,37,shanghai,http://example.com
  3. データをインポートします。

    ターゲットデバイスでターミナルを開き、curl コマンドを実行して Stream Load タスクを開始し、データをインポートします。

    インポートタスクを作成するための構文とパラメーターの説明については、「インポートタスクの作成」をご参照ください。次の例は、一般的なインポートシナリオを示しています。

    • Label を使用して重複を削除し、タイムアウトを指定します。

      ファイル test.csv からデータベース test_db のテーブル test_table にデータをインポートします。Label を使用して重複したデータバッチのインポートを回避し、タイムアウトを 100 秒に設定します。

       curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • Label を使用して重複を削除し、列を使用してファイルからデータをフィルターします。

      ファイル test.csv からデータベース test_db のテーブル test_table にデータをインポートします。Label を使用して重複したデータバッチのインポートを回避し、ファイルから列名を指定し、「address」列が「hangzhou」であるデータのみをインポートします。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 20% の許容エラー率を許可します。

      ファイル test.csv からデータベース test_db のテーブル test_table にデータをインポートし、20% のエラー率を許可します。

      curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 厳格モードを使用し、タイムゾーンを設定します。

      厳格モードを使用してインポートされたデータをフィルターし、タイムゾーンを Africa/Abidjan に設定します。

      curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • SelectDB のデータを削除します

      SelectDB 内の test.csv ファイルのデータと同一のデータを削除します。

      curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
    • 条件に基づいてファイルから不要なデータを削除し、残りのデータを SelectDB にインポートします

      test.csv ファイルから address 列が 'hangzhou' の行を削除し、残りの行を SelectDB にインポートします。

      curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load

例: Java コードを使用したインポート

package com.selectdb.x2doris.connector.doris.writer;

import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;

public class DorisLoadCase {
    public static void main(String[] args) throws Exception {

        // 1. パラメーターを設定します。
        String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
        String userName = "admin";
        String password = "****";

        // 2. httpclient をビルドします。リダイレクト (isRedirectable) を有効にする必要があることに注意してください。
        HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
            // リダイレクトを有効にします。
            @Override
            protected boolean isRedirectable(String method) {
                return true;
            }
        });
        httpClientBuilder.addInterceptorLast(new RequestContent(true));
        HttpClient httpClient = httpClientBuilder.build();

        // 3. httpPut リクエストオブジェクトをビルドします。
        HttpPut httpPut = new HttpPut(loadUrl);

        // httpHeader を設定します...
        String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
        httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
        httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");

        RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
        httpPut.setConfig(reqConfig);

        // 4. 送信するデータを設定します。ここでは、CSV を書き込みます。
        // 次のフィールドを持つテーブルがあると仮定します:
        // field1,field2,field3,field4
        // これは 3 つの CSV レコードをシミュレートします。Doris では、CSV のデフォルトの行区切り文字は \n で、列区切り文字は \t です。
        // String data =
        //        "1\t2\t3\t4\n" +
        //        "11\t22\t33\t44\n" +
        //        "111\t222\t333\t444";
        // すべての行を読み取ります。
         List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
        // すべての行を \n で結合します。
        String data = String.join("\n", lines);
        
        httpPut.setEntity(new StringEntity(data));

        // 5. リクエストを送信し、結果を処理します。
        HttpResponse httpResponse = httpClient.execute(httpPut);
        int httpStatus = httpResponse.getStatusLine().getStatusCode();
        String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
        String respMsg = httpResponse.getStatusLine().getReasonPhrase();

        if (httpStatus == HttpStatus.SC_OK) {
            // 戻り値をシリアル化するために、適切な JSON シリアル化コンポーネントを選択します。
            Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
            // SelectDB から返されたステータスコードを取得します...
            String dorisStatus = respAsMap.get("Status");
            // SelectDB が次のいずれかのステータスを返した場合、データが正常に書き込まれたことを意味します。
            List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
            if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
                throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
            } else {
                System.out.println("successful....");
            }
        } else {
            throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +",  url: " + loadUrl + ", error: " + respMsg);
        }
    }
}

JSON データのインポート

  1. データの宛先テーブルを作成します。

    1. SelectDB インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB for SelectDB インスタンスに接続する」をご参照ください。

    2. 次の文を実行してデータベースを作成します。

      CREATE DATABASE test_db;
    3. 次の文を実行してテーブルを作成します。

      CREATE TABLE test_table
      (
          id int,
          name varchar(50),
          age int
      )
      UNIQUE KEY(`id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 16
      PROPERTIES("replication_num" = "1");

  2. データをインポートします。

    重要

    JSON 形式でデータをインポートする場合、非配列形式のパフォーマンスは配列形式よりも大幅に高くなります。

    非配列形式のデータのインポート

    1. Stream Load ターミナルで、json.data という名前のファイルを作成します。ファイルには複数行が含まれ、各行に 1 つの JSON レコードが含まれている必要があります。以下はサンプルコンテンツです:

      {"id":1,"name":"Emily","age":25}
      {"id":2,"name":"Benjamin","age":35}
      {"id":3,"name":"Olivia","age":28}
      {"id":4,"name":"Alexander","age":60}
      {"id":5,"name":"Ava","age":17}
    2. データをインポートします。

      ターミナルを開き、curl コマンドを実行して Stream Load タスクを開始します。このタスクは、json.data ファイルからデータベース test_db のテーブル test_table にデータをインポートします。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

    配列形式のデータのインポート

    1. Stream Load ターミナルで、json_array.data という名前の JSON 配列形式のデータファイルを作成します。

      [
      {"userid":1,"username":"Emily","userage":25},
      {"userid":2,"username":"Benjamin","userage":35},
      {"userid":3,"username":"Olivia","userage":28},
      {"userid":4,"username":"Alexander","userage":60},
      {"userid":5,"username":"Ava","userage":17}
      ]
    2. データをインポートします。

      ターミナルを開き、curl コマンドを実行して Stream Load タスクを開始します。このタスクは、ローカルファイル json_array.data からデータベース test_db のテーブル test_table にデータをインポートします。

      curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load

HTTP Stream モード

Stream Load では、Table Value Function (TVF) 機能を使用して、SQL 式でインポートパラメーターを指定できます。この Stream Load 機能は http_stream と呼ばれます。TVF の使用方法の詳細については、「TVF」をご参照ください。

http_stream インポートの REST API URL は、通常の Stream Load インポートの URL とは異なります。

  • 通常の Stream Load URL: http://host:http_port/api/{db}/{table}/_stream_load

  • TVF http_stream を使用する URL: http://host:http_port/api/_http_stream

構文

以下は、Stream Load の HTTP Stream モードの構文です。

curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream

HTTP Stream パラメーターの説明については、「パラメーター」をご参照ください。

SQL パラメーター load_sql を HTTP ヘッダーに追加して、column_separatorline_delimiterwherecolumns などのパラメーターを置き換えることができます。次のコードブロックは、SQL パラメーター load_sql の例を示しています。

INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");

完全な例:

curl  --location-trusted -u admin:admin_123 -T test.csv  -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30"  http://host:http_port/api/_http_stream

よくある質問

Q1: インポート中に「get table cloud commit lock timeout」エラーが報告された場合はどうすればよいですか?

このエラーは、データの書き込みが頻繁すぎることによりテーブルのデッドロックが発生したことを示します。書き込み頻度を減らし、データのバッチ処理を使用することを強くお勧めします。1 回の Stream Load で数百 MB から 1 GB のデータを書き込むことができます。

Q2: CSV ファイルをインポートするとき、データに列または行の区切り文字が含まれている場合はどうすればよいですか?

新しい列区切り文字と行区切り文字を指定し、データが区切り文字と競合しないようにインポートデータを変更する必要があります。これにより、データが正しく解析されるようになります。以下のセクションで例を示します:

データに行区切り文字が含まれている場合

インポートされたデータに指定された行区切り文字、たとえばデフォルトの行区切り文字 \n が含まれている場合は、新しい行区切り文字を指定する必要があります。

たとえば、データファイルに次の内容が含まれているとします:

Zhang San\n,25,Shaanxi
Li Si\n,30,Beijing

このシナリオでは、ファイル内の \n はデータであり、行区切り文字ではありません。ただし、デフォルトの行区切り文字も \n です。ファイルが正しく解析されるようにするには、line_delimiter パラメーターを使用して新しい行区切り文字を指定し、ファイル内の各データ行の末尾に新しい区切り文字を明示的に追加する必要があります。以下に例を示します:

  1. インポートの行区切り文字を設定します。

    たとえば、デフォルトの行区切り文字 \n\r\n に置き換えるには、データをインポートするときに -H "line_delimiter:\r\n" を設定する必要があります。

  2. 指定された行区切り文字を各データ行の末尾に追加します。サンプルテキストは次のように変更する必要があります:

    Zhang San\n,25,Shaanxi\r\n
    Li Si\n,30,Beijing\r\n

データに列区切り文字が含まれている場合

インポートされたデータに指定された列区切り文字、たとえばデフォルトの列区切り文字 \t が含まれている場合は、新しい列区切り文字を指定する必要があります。

たとえば、データファイルに次の内容が含まれているとします:

Zhang San\t  25  Shaanxi
Li Si\t  30  Beijing

このシナリオでは、ファイル内の \t はデータであり、列区切り文字ではありません。ただし、デフォルトの列区切り文字も \t (タブ文字) です。ファイルが正しく解析されるようにするには、column_separator パラメーターを使用して新しい列区切り文字を指定し、ファイル内の列間に新しい区切り文字を明示的に追加する必要があります。以下に例を示します:

  1. インポートの列区切り文字を設定します。

    たとえば、デフォルトの列区切り文字 \t をコンマ (,) に置き換えるには、データをインポートするときに -H "column_separator:," を設定する必要があります。

  2. 指定された列区切り文字をデータ列の間に追加します。サンプルテキストは次のように変更する必要があります:

    Zhang San\t,25,Shaanxi
    Li Si\t,30,Beijing