Stream Load を使用して、ローカルファイルまたはデータストリームを ApsaraDB for SelectDB インスタンスにインポートできます。このトピックでは、Stream Load を使用して ApsaraDB for SelectDB にデータをインポートする方法について説明します。
背景情報
Stream Load は同期データインポートメソッドです。HTTP リクエストを送信して、ローカルファイルまたはデータストリームを ApsaraDB for SelectDB にインポートできます。 Stream Load はインポート結果をすぐに返します。リクエストの戻り値を確認して、インポートが成功したかどうかを判断できます。Stream Load は CSV (テキスト)、JSON、PARQUET、および ORC データ形式をサポートします。
Stream Load は、高いスループット、低いレイテンシーを提供し、柔軟で信頼性があります。強く推奨 される主要なデータインポート方法として Stream Load を使用してください。
準備
Stream Load リクエストを送信するために使用されるターミナルが、ネットワーク経由で SelectDB インスタンスに接続できることを確認します:
ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請と解放」をご参照ください。
Stream Load リクエストを送信するターミナルが ApsaraDB for SelectDB インスタンスと同じ VPC にある場合、このステップはスキップできます。
Stream Load リクエストを送信するターミナルの IP アドレスを ApsaraDB for SelectDB インスタンスのホワイトリストに追加します。詳細については、「ホワイトリストの設定」をご参照ください。
Stream Load リクエストを送信するターミナルにホワイトリストが設定されている場合は、SelectDB インスタンスの IP アドレス範囲をそのホワイトリストに追加します。
SelectDB インスタンスが属する VPC 内の SelectDB インスタンスの IP アドレスを取得するには、「ApsaraDB SelectDB インスタンスが属する VPC の IP アドレスを表示するにはどうすればよいですか?」で提供されている操作を実行できます。
SelectDB インスタンスのパブリック IP アドレスを取得するには、ping コマンドを実行して SelectDB インスタンスのパブリックエンドポイントにアクセスし、インスタンスの IP アドレスを取得できます。
オプション: 計算クラスター (バックエンド) の設定を変更して、Stream Load 操作レコードを有効にします。
デフォルトでは、計算クラスターは Stream Load 操作を記録しません。
Stream Load 操作を追跡するには、インポートタスクを作成する前に enable_stream_load_record を true に設定し、クラスターを再起動します。この機能を有効にするには、テクニカルサポートにチケットを送信する必要があります。
オプション: 計算クラスターの設定を変更して、Stream Load の最大インポートサイズを調整します。
Stream Load を使用してインポートできるファイルのデフォルトの最大サイズは 10,240 MB です。
ソースファイルがこのサイズを超える場合は、バックエンドパラメーター streaming_load_max_mb を調整できます。パラメーターの変更方法の詳細については、「パラメーターの設定」をご参照ください。
オプション: フロントエンド (FE) の設定を変更して、インポートのタイムアウトを調整します。
Stream Load タスクのデフォルトのタイムアウトは 600 秒です。インポートタスクが指定されたタイムアウト内に完了しない場合、システムはタスクをキャンセルし、そのステータスを CANCELLED に変更します。
ソースファイルが時間制限内にインポートできない場合は、Stream Load リクエストで特定のタイムアウトを設定するか、FE パラメーター stream_load_default_timeout_second を調整してインスタンスを再起動し、グローバルなデフォルトタイムアウトを設定できます。この調整を行うには、テクニカルサポートにチケットを送信する必要があります。
使用上の注意
1 回の Stream Load で数百 MB から 1 GB のデータを書き込むことができます。一部のビジネスシナリオでは、少量のデータを頻繁に書き込むと、インスタンスのパフォーマンスが大幅に低下し、テーブルのデッドロックを引き起こすことさえあります。書き込み頻度を減らし、データのバッチ処理を使用することを強くお勧めします。
アプリケーション側のバッチ処理: アプリケーション側でビジネスデータを収集し、Stream Load リクエストを SelectDB に送信します。
サーバー側のバッチ処理: SelectDB が Stream Load リクエストを受信した後、サーバーはデータに対してバッチ処理を実行します。詳細については、「グループコミット」をご参照ください。
インポートタスクの作成
Stream Load は HTTP プロトコルを介してデータを送信および転送します。次の例は、curl コマンドを使用してインポートタスクを送信する方法を示しています。このコマンドは、Linux または macOS ターミナル、または Windows コマンドプロンプトで実行できます。Stream Load 操作には他の HTTP クライアントを使用することもできます。
構文
curl --location-trusted -u <username>:<password> [-H ""] -H "expect:100-continue" -T <file_path> -XPUT http://<host>:<port>/api/<db
_name>/<table_name>/_stream_loadパラメーター
パラメーター | 必須 | 説明 |
| はい | 認証が必要な場合、これはリクエストがリダイレクトされるサーバーに |
| はい | SelectDB インスタンスのユーザー名とパスワードを指定します。
|
| いいえ | この Stream Load インポートリクエストのリクエストヘッダー (Header) の内容を指定します。形式は次のとおりです:
一般的なパラメーターは次のとおりです:
リクエストヘッダーパラメーターの詳細については、「リクエストヘッダーパラメーター」をご参照ください。 |
| はい | インポートするデータファイルのパスを指定します。 file_path: オブジェクトファイルのパス。 |
| はい | HTTP リクエストのメソッドです。PUT リクエストメソッドを使用して、SelectDB のデータインポートアドレスを指定します。具体的なパラメーターは次のとおりです:
|
リクエストヘッダーパラメーター
Stream Load は HTTP プロトコルを使用します。したがって、インポートタスクに関連するパラメーターはリクエストヘッダーで設定されます。次の表に、一般的なインポートパラメーターを示します。
パラメーター | 説明 |
| インポートタスクの一意の ID。
重要 同じバッチのデータには同じ |
| インポートデータ形式を指定します。
ファイル形式の要件と関連パラメーターの詳細については、「ファイル形式」をご参照ください。 |
| インポートファイル内の行区切り文字を指定します。 複数の文字の組み合わせを行区切り文字として使用することもできます。たとえば、Windows システムでは、行区切り文字として \r\n を使用します。
|
| インポートファイル内の列区切り文字を指定します。 複数の文字の組み合わせを列区切り文字として使用することもできます。たとえば、二重縦線 非表示文字の場合は、プレフィックスとして
|
| ファイルの圧縮形式を指定します。圧縮は サポートされている圧縮形式: |
| インポートタスクの最大許容エラー率を指定します。 インポートエラー率がこのしきい値を超えると、インポートは失敗します。エラー行を無視するには、インポートが成功するように、このパラメーターを 0 より大きい値に設定する必要があります。
|
| 厳格モードを有効にするかどうかを指定します。
|
| インポートに使用するクラスターを指定します。 デフォルトでは、インスタンスのデフォルトクラスターが使用されます。インスタンスにデフォルトクラスターが設定されていない場合、権限を持つクラスターが自動的に選択されます。 |
| 対応するパーティションの 1 つのタブレットにのみデータをインポートするかどうかを指定します。このパラメーターは、ランダムバケットを持つ Duplicate Key テーブルにデータをインポートする場合にのみ設定できます。
|
| インポートタスクのフィルター条件を指定します。
|
| インポートするデータのパーティション情報を指定します。 インポートするデータが指定されたパーティションに属していない場合、インポートされません。これらのデータ行は dpp.abnorm.ALL でカウントされます。
|
| インポートするデータの関数変換設定を指定します。 サポートされている関数変換メソッドには、列の順序変更と式変換が含まれます。式変換のメソッドは、検索文と同じです。 |
| データマージタイプを指定します。
重要
|
| これは、 |
| これは Unique Key モデルにのみ適用されます。同じキー列に対して、source_sequence 列に従って値列が置き換えられることを保証します。source_sequence は、データソースの列またはテーブルスキーマの列にすることができます。 |
| インポートメモリ制限を指定します。
|
| インポートのタイムアウトを指定します。
|
| このインポートに使用するタイムゾーンを指定します。このパラメーターは、インポートに関与するすべてのタイムゾーン関連関数の結果に影響します。タイムゾーンの詳細については、「IANA タイムゾーンデータベース」をご参照ください。 デフォルト値: |
| 2 フェーズコミットモードを有効にするかどうかを指定します。
|
jsonpaths | JSON 形式でデータをインポートするには 2 つの方法があります:
|
json_root |
デフォルト値は "" で、JSON オブジェクト全体がルートノードとして選択されることを意味します。 |
read_json_by_line | JSON データを処理するための Stream Load の重要なパラメーターです。複数行の JSON データを含む入力ファイルをどのように解析するかを制御します。
|
strip_outer_array | JSON データを処理するための Stream Load の重要なパラメーターです。外側の配列を含む JSON データをどのように解析するかを制御します。
重要 JSON 形式でデータをインポートする場合、非配列形式のパフォーマンスは配列形式よりも大幅に高くなります。 |
例
この例は、CSV ファイル data.csv を test_db データベースの test_table テーブルにインポートする方法を示しています。インスタンスの VPC エンドポイントは selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com です。これは単なるサンプル curl コマンドです。完全な例については、「完全なデータインポートの例」をご参照ください。
curl --location-trusted -u admin:admin_123 -T data.csv -H "label:123" -H "expect:100-continue" http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load戻り値
Stream Load は同期インポートメソッドです。インポート結果は作成リクエストへの応答で直接返されます。次のコードブロックは、戻り値のサンプルを示しています。
{
"TxnId": 17,
"Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 5,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 28,
"LoadTimeMs": 27,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 18
}次の表に、戻り値のパラメーターを示します。
パラメーター | 説明 |
| インポートのトランザクション ID。 |
| インポート ID。 カスタム ID を指定するか、システムによって生成させることができます。 |
| インポートステータス。有効な値は次のとおりです:
|
| 既存の このフィールドは、ステータスが このステータスを使用して、既存の Label に対応するインポートタスクの状態を判断できます。
|
| エラーメッセージ。 |
| 処理された行の総数。 |
| 正常にインポートされた行数。 |
| データ品質が不適格な行数。 |
|
|
| インポートされたバイト数。 |
| インポートにかかった時間。 単位: ミリ秒。 |
| FE にトランザクションの開始をリクエストするのにかかった時間。 単位: ミリ秒。 |
| FE にインポートデータ実行計画の取得をリクエストするのにかかった時間。 単位: ミリ秒。 |
| データの読み取りにかかった時間。 単位: ミリ秒。 |
| データ書き込み操作の実行にかかった時間。 単位: ミリ秒。 |
| FE にトランザクションのコミットと公開をリクエストするのにかかった時間。 単位: ミリ秒。 |
| データ品質に問題がある場合は、この URL にアクセスして特定のエラー行を表示できます。 |
インポートタスクのキャンセル
Stream Load タスクは作成後に手動でキャンセルすることはできません。システムは、タイムアウトが発生した場合またはインポートエラーが報告された場合にのみタスクを自動的にキャンセルします。戻り値の errorUrl を使用してエラーメッセージをダウンロードし、問題をトラブルシューティングできます。
Stream Load タスクの表示
Stream Load 操作レコードを有効にしている場合、MySQL クライアントを使用して ApsaraDB for SelectDB インスタンスに接続し、show stream load 文を実行して完了した Stream Load タスクを表示できます。
完全なデータインポートの例
準備
インポート操作を開始する前に、準備を完了してください。
CSV データのインポート
例: スクリプトを使用したインポート
データの宛先テーブルを作成します。
SelectDB インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB for SelectDB インスタンスに接続する」をご参照ください。
次の文を実行してデータベースを作成します。
CREATE DATABASE test_db;次の文を実行してテーブルを作成します。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 16 PROPERTIES("replication_num" = "1");
Stream Load ターミナルが配置されているデバイスで、インポートする
test.csvという名前のファイルを作成します。1,yang,32,shanghai,http://example.com 2,wang,22,beijing,http://example.com 3,xiao,23,shenzhen,http://example.com 4,jess,45,hangzhou,http://example.com 5,jack,14,shanghai,http://example.com 6,tomy,25,hangzhou,http://example.com 7,lucy,45,shanghai,http://example.com 8,tengyin,26,shanghai,http://example.com 9,wangli,27,shenzhen,http://example.com 10,xiaohua,37,shanghai,http://example.comデータをインポートします。
ターゲットデバイスでターミナルを開き、curl コマンドを実行して Stream Load タスクを開始し、データをインポートします。
インポートタスクを作成するための構文とパラメーターの説明については、「インポートタスクの作成」をご参照ください。次の例は、一般的なインポートシナリオを示しています。
Labelを使用して重複を削除し、タイムアウトを指定します。ファイル
test.csvからデータベースtest_dbのテーブルtest_tableにデータをインポートします。Label を使用して重複したデータバッチのインポートを回避し、タイムアウトを 100 秒に設定します。curl --location-trusted -u admin:admin_123 -H "label:123" -H "timeout:100" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadLabelを使用して重複を削除し、列を使用してファイルからデータをフィルターします。ファイル
test.csvからデータベースtest_dbのテーブルtest_tableにデータをインポートします。Label を使用して重複したデータバッチのインポートを回避し、ファイルから列名を指定し、「address」列が「hangzhou」であるデータのみをインポートします。curl --location-trusted -u admin:admin_123 -H "label:123" -H "columns: id,name,age,address,url" -H "where: address='hangzhou'" -H "expect:100-continue" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load20% の許容エラー率を許可します。
ファイル
test.csvからデータベースtest_dbのテーブルtest_tableにデータをインポートし、20% のエラー率を許可します。curl --location-trusted -u admin:admin_123 -H "label:123" -H "max_filter_ratio:0.2" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load厳格モードを使用し、タイムゾーンを設定します。
厳格モードを使用してインポートされたデータをフィルターし、タイムゾーンを
Africa/Abidjanに設定します。curl --location-trusted -u admin:admin_123 -H "strict_mode: true" -H "timezone: Africa/Abidjan" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_loadSelectDB のデータを削除します。
SelectDB 内の test.csv ファイルのデータと同一のデータを削除します。
curl --location-trusted -u admin:admin_123 -H "merge_type: DELETE" -H "expect:100-continue" -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load条件に基づいてファイルから不要なデータを削除し、残りのデータを SelectDB にインポートします。
test.csv ファイルから address 列が 'hangzhou' の行を削除し、残りの行を SelectDB にインポートします。
curl --location-trusted -u admin:admin_123 -H "expect:100-continue" -H "columns: id,name,age,address,url" -H "merge_type: MERGE" -H "delete: address='hangzhou'" -H "column_separator:," -T test.csv http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/testDb/testTbl/_stream_load
例: Java コードを使用したインポート
package com.selectdb.x2doris.connector.doris.writer;
import com.alibaba.fastjson2.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.RequestContent;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
public class DorisLoadCase {
public static void main(String[] args) throws Exception {
// 1. パラメーターを設定します。
String loadUrl = "http://<Host:Port>/api/<DB>/<TABLE>/_stream_load?";
String userName = "admin";
String password = "****";
// 2. httpclient をビルドします。リダイレクト (isRedirectable) を有効にする必要があることに注意してください。
HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
// リダイレクトを有効にします。
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
httpClientBuilder.addInterceptorLast(new RequestContent(true));
HttpClient httpClient = httpClientBuilder.build();
// 3. httpPut リクエストオブジェクトをビルドします。
HttpPut httpPut = new HttpPut(loadUrl);
// httpHeader を設定します...
String basicAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", userName, password).getBytes(StandardCharsets.UTF_8));
httpPut.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + basicAuth);
httpPut.addHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8");
RequestConfig reqConfig = RequestConfig.custom().setConnectTimeout(30000).build();
httpPut.setConfig(reqConfig);
// 4. 送信するデータを設定します。ここでは、CSV を書き込みます。
// 次のフィールドを持つテーブルがあると仮定します:
// field1,field2,field3,field4
// これは 3 つの CSV レコードをシミュレートします。Doris では、CSV のデフォルトの行区切り文字は \n で、列区切り文字は \t です。
// String data =
// "1\t2\t3\t4\n" +
// "11\t22\t33\t44\n" +
// "111\t222\t333\t444";
// すべての行を読み取ります。
List<String> lines = Files.readAllLines(Paths.get("your_file.csv"));
// すべての行を \n で結合します。
String data = String.join("\n", lines);
httpPut.setEntity(new StringEntity(data));
// 5. リクエストを送信し、結果を処理します。
HttpResponse httpResponse = httpClient.execute(httpPut);
int httpStatus = httpResponse.getStatusLine().getStatusCode();
String respContent = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
if (httpStatus == HttpStatus.SC_OK) {
// 戻り値をシリアル化するために、適切な JSON シリアル化コンポーネントを選択します。
Map<String, String> respAsMap = JSON.parseObject(respContent, Map.class);
// SelectDB から返されたステータスコードを取得します...
String dorisStatus = respAsMap.get("Status");
// SelectDB が次のいずれかのステータスを返した場合、データが正常に書き込まれたことを意味します。
List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout", "200");
if (!DORIS_SUCCESS_STATUS.contains(dorisStatus) || !respMsg.equals("OK")) {
throw new RuntimeException("StreamLoad failed, status: " + dorisStatus + ", Response: " + respMsg);
} else {
System.out.println("successful....");
}
} else {
throw new IOException("StreamLoad Response HTTP Status Error, httpStatus: "+ httpStatus +", url: " + loadUrl + ", error: " + respMsg);
}
}
}JSON データのインポート
データの宛先テーブルを作成します。
SelectDB インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB for SelectDB インスタンスに接続する」をご参照ください。
次の文を実行してデータベースを作成します。
CREATE DATABASE test_db;次の文を実行してテーブルを作成します。
CREATE TABLE test_table ( id int, name varchar(50), age int ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 16 PROPERTIES("replication_num" = "1");
データをインポートします。
重要JSON 形式でデータをインポートする場合、非配列形式のパフォーマンスは配列形式よりも大幅に高くなります。
非配列形式のデータのインポート
Stream Load ターミナルで、
json.dataという名前のファイルを作成します。ファイルには複数行が含まれ、各行に 1 つの JSON レコードが含まれている必要があります。以下はサンプルコンテンツです:{"id":1,"name":"Emily","age":25} {"id":2,"name":"Benjamin","age":35} {"id":3,"name":"Olivia","age":28} {"id":4,"name":"Alexander","age":60} {"id":5,"name":"Ava","age":17}データをインポートします。
ターミナルを開き、curl コマンドを実行して Stream Load タスクを開始します。このタスクは、
json.dataファイルからデータベースtest_dbのテーブルtest_tableにデータをインポートします。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "read_json_by_line:true" -T json.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
配列形式のデータのインポート
Stream Load ターミナルで、
json_array.dataという名前の JSON 配列形式のデータファイルを作成します。[ {"userid":1,"username":"Emily","userage":25}, {"userid":2,"username":"Benjamin","userage":35}, {"userid":3,"username":"Olivia","userage":28}, {"userid":4,"username":"Alexander","userage":60}, {"userid":5,"username":"Ava","userage":17} ]データをインポートします。
ターミナルを開き、curl コマンドを実行して Stream Load タスクを開始します。このタスクは、ローカルファイル
json_array.dataからデータベースtest_dbのテーブルtest_tableにデータをインポートします。curl --location-trusted -u admin:admin_123 -H "Expect:100-continue" -H "format:json" -H "jsonpaths:[\"$.userid\", \"$.userage\", \"$.username\"]" -H "columns:id,age,name" -H "strip_outer_array:true" -T json_array.data -XPUT http://selectdb-cn-h033cjs****-fe.selectdbfe.pre.rds.aliyuncs.com:8080/api/test_db/test_table/_stream_load
HTTP Stream モード
Stream Load では、Table Value Function (TVF) 機能を使用して、SQL 式でインポートパラメーターを指定できます。この Stream Load 機能は http_stream と呼ばれます。TVF の使用方法の詳細については、「TVF」をご参照ください。
http_stream インポートの REST API URL は、通常の Stream Load インポートの URL とは異なります。
通常の Stream Load URL:
http://host:http_port/api/{db}/{table}/_stream_load。TVF http_stream を使用する URL:
http://host:http_port/api/_http_stream。
構文
以下は、Stream Load の HTTP Stream モードの構文です。
curl --location-trusted -u <username>:<password> [-H "sql: ${load_sql}"...] -T <file_name> -XPUT http://host:http_port/api/_http_streamHTTP Stream パラメーターの説明については、「パラメーター」をご参照ください。
例
SQL パラメーター load_sql を HTTP ヘッダーに追加して、column_separator、line_delimiter、where、columns などのパラメーターを置き換えることができます。次のコードブロックは、SQL パラメーター load_sql の例を示しています。
INSERT INTO db.table (col, ...) SELECT stream_col, ... FROM http_stream("property1"="value1");完全な例:
curl --location-trusted -u admin:admin_123 -T test.csv -H "sql:insert into demo.example_tbl_1(user_id, age, cost) select c1, c4, c7 * 2 from http_stream(\"format\" = \"CSV\", \"column_separator\" = \",\" ) where age >= 30" http://host:http_port/api/_http_streamよくある質問
Q1: インポート中に「get table cloud commit lock timeout」エラーが報告された場合はどうすればよいですか?
このエラーは、データの書き込みが頻繁すぎることによりテーブルのデッドロックが発生したことを示します。書き込み頻度を減らし、データのバッチ処理を使用することを強くお勧めします。1 回の Stream Load で数百 MB から 1 GB のデータを書き込むことができます。
Q2: CSV ファイルをインポートするとき、データに列または行の区切り文字が含まれている場合はどうすればよいですか?
新しい列区切り文字と行区切り文字を指定し、データが区切り文字と競合しないようにインポートデータを変更する必要があります。これにより、データが正しく解析されるようになります。以下のセクションで例を示します:
データに行区切り文字が含まれている場合
インポートされたデータに指定された行区切り文字、たとえばデフォルトの行区切り文字 \n が含まれている場合は、新しい行区切り文字を指定する必要があります。
たとえば、データファイルに次の内容が含まれているとします:
Zhang San\n,25,Shaanxi
Li Si\n,30,Beijingこのシナリオでは、ファイル内の \n はデータであり、行区切り文字ではありません。ただし、デフォルトの行区切り文字も \n です。ファイルが正しく解析されるようにするには、line_delimiter パラメーターを使用して新しい行区切り文字を指定し、ファイル内の各データ行の末尾に新しい区切り文字を明示的に追加する必要があります。以下に例を示します:
インポートの行区切り文字を設定します。
たとえば、デフォルトの行区切り文字
\nを\r\nに置き換えるには、データをインポートするときに-H "line_delimiter:\r\n"を設定する必要があります。指定された行区切り文字を各データ行の末尾に追加します。サンプルテキストは次のように変更する必要があります:
Zhang San\n,25,Shaanxi\r\n Li Si\n,30,Beijing\r\n
データに列区切り文字が含まれている場合
インポートされたデータに指定された列区切り文字、たとえばデフォルトの列区切り文字 \t が含まれている場合は、新しい列区切り文字を指定する必要があります。
たとえば、データファイルに次の内容が含まれているとします:
Zhang San\t 25 Shaanxi
Li Si\t 30 Beijingこのシナリオでは、ファイル内の \t はデータであり、列区切り文字ではありません。ただし、デフォルトの列区切り文字も \t (タブ文字) です。ファイルが正しく解析されるようにするには、column_separator パラメーターを使用して新しい列区切り文字を指定し、ファイル内の列間に新しい区切り文字を明示的に追加する必要があります。以下に例を示します:
インポートの列区切り文字を設定します。
たとえば、デフォルトの列区切り文字
\tをコンマ(,)に置き換えるには、データをインポートするときに-H "column_separator:,"を設定する必要があります。指定された列区切り文字をデータ列の間に追加します。サンプルテキストは次のように変更する必要があります:
Zhang San\t,25,Shaanxi Li Si\t,30,Beijing