ローカルファイルまたはデータストリームをApsaraDB for SelectDBインスタンスにインポートする場合、Stream Loadを使用してデータを同期的にインポートできます。 このトピックでは、Stream Loadを使用してSelectDBインスタンスにデータをインポートする方法について説明します。
背景情報
Stream Loadは同期データインポート方式です。 このメソッドを使用すると、HTTPリクエストを送信してローカルファイルまたはデータストリームをSelectDBインスタンスにインポートできます。 Stream Loadはデータを同期的にインポートし、すぐにインポート結果を返します。 結果に基づいて、Stream Loadジョブが成功したかどうかを判断できます。 ストリームロードを使用して、CSV
、JSON
、Parquet
、またはOptimized Row Columnar (ORC)
形式でデータをインポートできます。
ストリームロードは、高スループット、低レイテンシ、および高い柔軟性と信頼性を備えています。 推奨するには、Stream Loadを使用してデータをインポートします。
準備
Stream Loadリクエストを開始するターミナルがSelectDBインスタンスに接続されていることを確認します。
SelectDBインスタンスのパブリックエンドポイントを申請します。 詳細については、「パブリックエンドポイントの申請またはリリース」をご参照ください。
Stream Loadリクエストを開始する端末がSelectDBインスタンスと同じ仮想プライベートクラウド (VPC) にある場合は、この手順をスキップします。
Stream Loadリクエストが開始されたターミナルの関連IPアドレスを、SelectDBインスタンスのIPアドレスホワイトリストに追加します。 詳細については、「IPアドレスホワイトリストの設定」をご参照ください。
SelectDBインスタンスのCIDRブロックをデータソースクラスターのIPアドレスホワイトリストに追加します。 これは、ストリームロード要求が開始される端末でホワイトリストメカニズムがサポートされている場合に適用されます。
SelectDBインスタンスが属するVPC内のSelectDBインスタンスのIPアドレスを取得するには、「ApsaraDB SelectDBインスタンスが属するVPC内のIPアドレスを表示する方法」の操作を実行します。
SelectDBインスタンスのパブリックIPアドレスを取得するには、pingコマンドを実行してSelectDBインスタンスのパブリックエンドポイントにアクセスし、インスタンスのIPアドレスを取得します。
オプション。 Stream Loadジョブの操作レコードを保持するようにバックエンド (BE) 設定を変更します。
既定では、Stream Loadジョブの操作レコードはBEでは作成されません。
Stream Loadジョブの操作レコードをトレースする場合は、Stream Loadジョブを作成する前に、enable_stream_load_recordをtrueに設定する必要があります。 パラメーターを変更するには、チケットを起票する必要があります。
オプション。 BE設定を変更して、Stream Loadを使用してインポートできるファイルの最大サイズを調整します。
デフォルトでは、ストリームロードを使用してインポートできるファイルの最大サイズは1,024 MBです。
Stream Loadを使用してインスタンスにインポートするファイルのサイズが1,024 MBを超える場合は、streaming_load_max_mb BEパラメーターを変更する必要があります。 詳細については、「パラメーターの設定」をご参照ください。
オプション。 フロントエンド (FE) 設定を変更して、インポートのタイムアウト期間を調整します。
デフォルトでは、Stream Loadジョブのタイムアウト時間は600秒です。 指定したタイムアウト期間内にStream Loadジョブが完了しない場合、システムはStream Loadジョブをキャンセルし、ステータスをCANCELLEDに変更します。
[Stream Load] ジョブが特定のタイムアウト期間内に完了できない場合は、[Stream Load] リクエストで新しいタイムアウト期間を指定するか、またはstream_load_default_timeout_second FEパラメーターを変更して、すべての [Stream Load] ジョブの新しいデフォルトのタイムアウト期間を指定できます。 FEパラメーターを変更するには、チケットを起票する必要があります。
使用上の注意
Stream Loadを使用して、サイズが数百MBから1 GBの範囲のデータを一度にSelectDBインスタンスに書き込むことができます。 特定のシナリオでは、SelectDBインスタンスに少量のデータを頻繁に書き込むと、インスタンスのパフォーマンスが大幅に低下し、テーブルでデッドロックが発生する可能性があります。 SelectDBインスタンスにデータを書き込む頻度を減らすことを推奨します。 次の方法を使用して、少量のデータをSelectDBインスタンスに同時に書き込むことができます。
アプリケーションでのバッチ操作: ビジネスデータを収集し、SelectDBインスタンスにStream Loadリクエストを送信する必要があります。
サーバーでのバッチ操作: SelectDBがStream Loadリクエストを受信すると、インスタンスサーバーはリクエストされたデータをバッチ処理します。 詳細については、「グループコミット機能を使用したデータのインポート」をご参照ください。
ストリームロードジョブの作成
Stream Loadは、HTTPプロトコルを介してデータを送信および転送します。 次のコードスニペットは、curlコマンドを実行してStream Loadジョブを送信する方法の例を示しています。 LinuxまたはmacOSオペレーティングシステムの端末で、またはWindowsでコマンドプロンプトを使用してコマンドを実行できます。 他のHTTPクライアントを使用してStream Loadジョブを送信することもできます。
構文
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_load
パラメーター
パラメーター | 必要 | 説明 |
| 必須 | 認証が必要な場合、リクエストがリダイレクトされるサーバーに |
| 必須 | SelectDBインスタンスへの接続に使用されるユーザー名とパスワード。 関連するフィールド:
|
| 任意 | リクエストヘッダー。 形式: 関連するフィールド:
詳細については、「リクエストヘッダーフィールド」をご参照ください。 |
| 必須 | インポートするファイルのパス。 関連フィールド: file_path: インポートするファイルのパス。 |
| 必須 | HTTPリクエストのメソッド。 この場合、PUTメソッドが使用されます。 データをインポートするSelectDBインスタンスのURLを指定する必要があります。 関連するフィールド:
|
リクエストヘッダーフィールド
Stream LoadはHTTPプロトコルを使用します。 したがって、Stream Loadジョブに関連するパラメーターは、主にヘッダーで指定されます。 データのインポートに使用するパラメーターの共通フィールドを次の表に示します。
フィールド | 説明 |
| Stream Loadジョブの一意の識別子。 使用法:
|
| インポートするデータの形式。
フォーマット要件と関連するパラメーターの詳細については、「」をご参照ください。 |
| インポートするファイルの行区切り文字。 行区切り文字として複数の文字の組み合わせを使用できます。 たとえば、Windowsは行区切り文字として \r\nを使用します。
|
| インポートするファイルの列区切り文字。 複数の文字の組み合わせを列の区切り文字として使用できます。 たとえば、 非表示文字を列区切り文字として指定する場合は、
|
| インポートするファイルの圧縮形式。 次の圧縮形式がサポートされています。 |
| Stream Loadジョブの最大許容レート。 インポートエラー率が最大許容値を超えた場合、インポートは失敗します。 無効なデータ行を無視する場合は、このパラメーターを0より大きい値に設定して、ジョブが成功するようにします。
|
| strictモードを有効にするかどうかを指定します。 有効値:
|
| データのインポートに使用されるクラスター。 デフォルトでは、インスタンスのデフォルトクラスターが使用されます。 インスタンスにデフォルトクラスターがない場合、システムは自動的にアクセス権限を持つクラスターを選択します。 |
| 対応するパーティションの1つのタブレットにのみデータをインポートするかどうかを指定します。 このパラメーターは、を使用するテーブルにデータをインポートする場合にのみ使用できます。複製キーモデルであり、ランダムなパーティションを含みます。 有効値:
|
| Stream Loadジョブに指定されているフィルター条件。 Stream Loadを使用すると、 |
| テーブルにデータをインポートするパーティション。 インポートするデータが指定されたパーティションに属していない場合、データはインポートされません。 インポートされないデータ行の数は、dpp.abnorm.ALLでカウントされます。
|
| インポートするデータの関数変換設定。 Stream Loadは、列の順序の変更と式の変換をサポートします。 式変換構文は、クエリ文で使用する構文と同じです。 |
| データマージのタイプ。 有効値:
重要
|
| データを削除するための条件。 このパラメーターは、 |
| このパラメーターは、UNIQUE KEYモデルを使用するテーブルでのみ使用でき、同じキー列を持つレコードの指定されたsource_sequence列に基づいて値列に対してREPLACE操作が実行されるようにします。 source_sequence列は、データソースの列またはスキーマ内の既存の列にすることができます。 |
| Stream Loadジョブに割り当てることができるメモリの最大サイズ。
|
| Stream Loadジョブのタイムアウト期間。
|
| Stream Loadジョブに使用されるタイムゾーン。 このパラメーターは、Stream Loadジョブに含まれるすべてのタイムゾーン関連関数の結果に影響します。 詳細については、「IANAデータベースのタイムゾーン」をご参照ください。 デフォルト値: |
| Stream Loadジョブの2相コミット (2PC) モードを有効にするかどうかを指定します。 false (デフォルト): モードを無効にします。 true: モードを有効にします。 2PCモードを有効にすると、データが書き込まれた直後にインポート結果が返されます。 このフェーズでは、データは不可視であり、トランザクションはPRECOMMITTED状態にある。 データは、コミット操作を手動でトリガーした後にのみ表示されます。 推奨設定: モードを有効にすると、すべてのStream Loadジョブが成功または失敗します。 追加モードでは、データのインポート中にすべてのデータが表示されないようにします。 次のシナリオでモードを有効にする。
次のシナリオでモードを無効にします。
|
jsonpaths | JSONデータのインポートに使用できるメソッド。 有効値:
|
json_root | JSONデータで子オブジェクトを指定するために使用されるパラメーター。 子オブジェクトは、データの解析とインポートのルートノードとして使用できます。 デフォルト値: "" 。 デフォルト値を使用すると、JSONコンテンツ全体がデータの解析とインポートのルートノードとして使用されます。 |
strip_outer_array | Stream Loadを使用してJSONデータをインポートするときに、JSONコンテンツ全体を保持するかどうかを指定する重要なパラメーター。
|
read_json_by_line | Stream Loadを使用してJSONデータをインポートするときに、JSONデータの複数の行を含むファイル全体を保持するかどうかを指定する重要なパラメーター。
|
例
VPCエンドポイントがselectdb-cn-h033cjs **** -fe.selectdbfe.pre.rds.aliyuncs.comであるインスタンスのtest_dbデータベースのtest_tableにdata.csvファイルをインポートできます。 次のサンプルコードは、Stream Loadを使用してデータをインポートする方法を示しています。 完全なコードの詳細については、「インポートデータの完全な例」をご参照ください。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
レスポンスの説明
Stream Loadは、データをインポートするための同期方式です。 したがって、Stream Loadジョブの結果は、HTTPリクエストへの応答として直接返されます。 次のサンプルコードは、サンプル応答を提供します。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}
次の表に、レスポンスのパラメーターを示します。
パラメーター | 説明 |
| Stream LoadジョブのトランザクションID。 |
| カスタムラベルを作成するか、自動的に生成されるラベルを使用できます。 |
| Stream Loadジョブの状態。 有効値:
|
| 既存のラベルに関連付けられているStream Loadジョブの状態 このパラメーターは、Statusパラメーターの値が このパラメーターの値に基づいて、既存のラベルに関連付けられているStream Loadジョブのステータスを取得できます。 有効値:
|
| Stream Loadジョブに対して返されるエラーメッセージ。 |
| Stream Loadジョブによって処理されるデータ行の総数。 |
| インポートされるデータ行の数。 |
| インポートできないデータ行の数。 |
| |
| Stream Loadジョブによってインポートされるバイト数。 |
| Stream Loadジョブの期間。 単位: ミリ秒。 |
| フロントエンド (FE) にトランザクションの開始を要求するために消費される時間。 単位: ミリ秒。 |
| FEがStream Loadジョブの実行プランを返すように要求するのにかかる時間。 単位: ミリ秒。 |
| データの読み取りにかかる時間。 単位: ミリ秒。 |
| データの書き込みにかかる時間。 単位: ミリ秒。 |
| トランザクションのコミットと発行をFEに要求するのにかかる時間。 単位: ミリ秒。 |
| エラーデータ行を表示できるURL。 |
Stream Loadジョブのキャンセル
ジョブの作成後、Stream Loadジョブを手動でキャンセルすることはできません。 タイムアウトエラーまたはインポートエラーが発生すると、Stream Loadジョブはシステムによって自動的にキャンセルされます。 レスポンスでErrorURLを使用して、エラー情報をダウンロードし、問題のトラブルシューティングを行うことができます。
ストリームロードジョブの表示
Stream Loadジョブを作成する前にenable_stream_load_recordをtrueに設定した場合、MySQLクライアントを使用してSelectDBインスタンスに接続し、SHOW STREAM LOAD
ステートメントを実行して、完了したStream Loadジョブを表示します。 デフォルトでは、バックエンド (BE) はStream Loadジョブに関する情報を記録しません。
インポートデータの完全な例
準備
データをインポートする前に、このトピックで説明されている準備が行われていることを確認してください。
CSVデータのインポート
スクリプトを使用したCSVデータファイルのインポート
データをインポートするテーブルを作成します。
SelectDBインスタンスに接続します。 詳細については、「DMSを使用したApsaraDB For SelectDBインスタンスへの接続」をご参照ください。
次のステートメントを実行して、データベースを作成します。
CREATE DATABASE test_db;
次のステートメントを実行してテーブルを作成します。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
Stream Loadジョブを開始するターミナルにインポートするファイルを作成します。 たとえば、ファイル名は
test.csv
です。1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.com
データをインポートします。
マシンでコマンドラインツールを開き、curlコマンドを実行して、データインポート用のStream Loadジョブを開始します。
データインポートジョブの作成に使用される構文とパラメーターの詳細については、「ストリームロードジョブの作成」をご参照ください。 データをインポートする方法の例を次に示します。
label
フィールドを使用してデータレコードを重複排除し、タイムアウト期間を指定します。からデータをインポートする
test.csv
ファイルをtest_table
のテーブルtest_db
ラベルを使用して、インポートされたデータレコードを重複排除し、タイムアウト期間を100秒に設定します。curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
label
フィールドを使用してデータレコードを重複排除し、列を使用してファイルからインポートするデータをフィルタリングします。からデータをインポートする
test.csv
ファイルをtest_table
のテーブルtest_db
ラベルを使用して、インポートされたデータレコードを重複排除し、アドレス列の値がhangzhouであるデータのみをインポートするためにファイル内の列名を指定します。curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
最大公差率を20% に設定します。
からデータをインポートする
test.csv
ファイルをtest_table
のテーブルtest_db
最大公差率を20% に設定します。curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
strictモードを使用し、タイムゾーンを指定します。
インポートしたデータをstrictモードでフィルターし、タイムゾーンを
Africa/Abidjan
に設定します。curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
[] SelectDB インスタンスのデータを削除します。
では、SelectDBインスタンスからインポートするデータと同じデータを削除します。test.csvファイルを作成します。
curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
関連条件に基づいてインポートする必要のないデータを削除し、残りのデータを SelectDBインスタンスにインポートします。
インポートするデータからaddress列の値がhangzhouである行を削除し、残りのdat aをSelectDBインスタンスにインポートします。
curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
Javaコードを使用したCSVデータファイルのインポート
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. Configure the parameters.
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. Construct an HTTP client. Take note that the redirection feature must be enabled for the client.
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// Enable the redirection feature.
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. Construct an HTTP PUT request object.
HttpPut httpPut = new HttpPut(loadUrl);
// Configure request headers.
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. Specify the data to be imported. In this example, a CSV file is specified.
// For example, you want to import data from a table that contains the following fields.
// field1,field2,field3,field4
// The CSV file contains the following three records. By default, the row delimiter is \n and the column delimiter is \t for a CSV file.
String data =
"1\t2\t3\t4\n" +
"11\t22\t33\t44\n" +
"111\t222\t333\t444";
httpPut.setEntity(new StringEntity(data));
// 5. Send the request and process the returned result.
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// Serialize the returned result by using an appropriate JSON serialization component.
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// Obtain the status code returned by ApsaraDB for SelectDB.
String dorisStatus = respAsMap.get("Status");
// If ApsaraDB for SelectDB returns the following status data, the data is imported.
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}
JOSNデータのインポート
データをインポートするテーブルを作成します。
SelectDBインスタンスに接続します。 詳細については、「DMSを使用したApsaraDB For SelectDBインスタンスへの接続」をご参照ください。
次のステートメントを実行して、データベースを作成します。
CREATE DATABASE test_db;
次のステートメントを実行してテーブルを作成します。
CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
データをインポートする。
非配列データのインポート
Stream Loadジョブを開始する端末に
json.data
ファイルを作成します。 ファイルには複数の行が含まれ、それぞれにJSONデータレコードが含まれます。 例:{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}
ファイルをインポートします。
ターミナルを開き、curlコマンドを実行してStream Loadジョブを開始し、
json.data
ファイル内のデータをtest_db
データベースのtest_table
テーブルにインポートします。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
配列データのインポート
Stream Loadジョブを開始する端末に
json_array.data
ファイルを作成します。[ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]
ファイルをインポートします。
ターミナルを開き、curlコマンドを実行してStream Loadジョブを開始し、
json_array.data
ファイルのデータをtest_db
データベースのtest_table
テーブルにインポートします。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
HTTPストリームモード
Stream Loadを使用すると、テーブル値関数 (TVF) を使用して、SQLステートメントでリクエストパラメーターを指定できます。 このTVFはhttp_stream
という名前です。 TVFの使用方法の詳細については、「ファイルの分析」をご参照ください。
HTTPストリームモードのStream LoadのRESTful API URLは、通常モードのStream LoadのURLとは異なります。
通常モードでのStream LoadのURL:
http:// host:http_port/api/{db}/{table}/_stream_load
HTTPストリームモードでのストリームロードのURL:
http:// host:http_port/api/_http_Stream
構文
HTTP StreamモードでStream Loadジョブを送信するには、次のコマンドを実行します。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_stream
HTTPストリームモードでのStream Loadジョブのパラメーターの詳細については、このトピックの「パラメーター」をご参照ください。
例
load_sqlパラメーターを使用して、HTTPヘッダーのcolumn_separator
、line_delimiter
、where
、およびcolumns
パラメーターを置き換えることができます。 次のサンプルコードは、load_sqlパラメーターで指定されたSQL文を示しています。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");
完全なサンプルコマンド:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_stream
FAQ
データインポート中にget table cloud commit lock timeout
エラーメッセージが表示された場合はどうすればよいですか?
SelectDBインスタンスにデータを頻繁に書き込みます。 その結果、デッドロックが発生する。 SelectDBインスタンスにデータを書き込む頻度を減らすことを推奨します。 少量のデータを一度にSelectDBインスタンスに書き込むことができます。 Stream Loadを使用して、数百MBから1 GBのサイズのデータを一度にSelectDBインスタンスに書き込むことができます。
を使用してインポートしたいCVSファイルのデータがストリームロード指定された行区切り文字と列区切り文字?
別の行区切り文字と列区切り文字を指定し、データファイルを変更して、インポートするデータに新しい行区切り文字と列区切り文字が含まれないようにする必要があります。 このようにして、データを適切に解析することができる。 例:
インポートするデータに行区切り文字を指定
インポートするデータに指定された行区切り文字が含まれている場合は、行区切り文字を変更する必要があります。 たとえば、インポートするデータにデフォルトの行区切り文字 \n
が含まれている場合、別の行区切り文字を指定する必要があります。
サンプルファイル:
ZHANG San\n, 25, Shaanxi
LI Si\n, 30, Beijing
このファイルでは、 \n
は行の区切り文字ではなく、インポートするデータを示します。 ただし、ファイルのデフォルトの行区切り文字も \n
です。 ファイルを適切に解析する場合は、line_delimiter
を使用して別の行の区切り文字を指定し、ファイルの各データ行の末尾に新しい行の区切り文字が表示されるようにする必要があります。 例:
別の行区切り文字を指定します。
たとえば、デフォルトの 行区切り文字
\n
と\r\n
を設定する必要があります-H "line_delimiter:\r\n"
のためデータをインポートします。ファイルの各データ行の末尾に新しい行区切り文字を追加します。 上記のサンプルファイルを次のファイルに変更します。
ZHANG San\n, 25, Shaanxi\r\n LI Si\n, 30, Beijing\r\n
インポートするデータに列区切り文字を指定
インポートするデータに列区切り文字が指定されている場合は、列の区切り文字を変更する必要があります。 たとえば、インポートするデータにデフォルトの列区切り文字 \t
が含まれている場合、別の列区切り文字を指定する必要があります。
サンプルファイル:
ZHANG San\t, 25 Shaanxi
LI Si\t, 30 Beijing
このファイルでは、 \t
は列区切り文字ではなくインポートするデータを示します。 ただし、ファイルのデフォルトの列区切り文字も \t
です。 ファイルを正しく解析する場合は、colume_separator
を使用して別の列区切り文字を指定し、ファイルの各データ列に新しい列の区切り文字が表示されるようにする必要があります。 例:
別の列区切り文字を指定します。
たとえば、既定の列区切り文字
\t
をコンマ (,) に置き換える場合は、-H "column_separator:,"
For data import.ファイルの各データ列に新しい列区切り文字を追加します。 上記のサンプルファイルを次のファイルに変更します。
ZHANG San\t, 25, Shaanxi LI Si\t, 30, Beijing