Lindorm は、データを迅速かつ確実にインポートするための bulkload 機能を提供します。このトピックでは、データをバッチでインポートする方法について説明します。
機能
bulkload 機能は、バイパスモードでデータファイルをロードします。データ API の書き込みパスやインスタンスの計算リソースは使用しません。API を使用したデータインポートと比較して、bulkload 機能には次の利点があります。
データのインポート速度が 10 倍以上向上します。
オンラインサービスのリソースを使用しないため、安定したオンラインサービスを保証します。
オンラインリソースとオフラインリソースを分離することで、柔軟なリソース使用を実現します。
CSV、ORC、Parquet、MaxCompute など、さまざまなデータソースからのデータインポートをサポートします。
使いやすいです。バイパスモードでデータをバッチでロードするためにコードを記述する必要はありません。
費用対効果が高いです。Lindorm Tunnel Service (LTS) は、サーバーレス Spark のクラウドネイティブな弾力性を利用して、bulkload のための計算リソースを提供します。リソースは必要に応じてスケーリングされ、従量課金制です。長期間にわたって計算リソースを構成する必要がないため、コストを削減できます。
前提条件
LTS が有効化され、LTS コンソールにログインしていること。詳細については、「LTS の有効化とログイン」をご参照ください。
Lindorm コンピュートエンジンが有効化されていること。詳細については、「有効化とスペックアップ/スペックダウン」をご参照ください。
Spark データソースが追加されていること。詳細については、「Spark データソースの追加」をご参照ください。
サポートされているデータソース
ソースデータソース | 宛先データソース |
MaxCompute テーブル | LindormTable |
HDFS CSV または OSS CSV | |
HDFS Parquet または OSS Parquet | |
HDFS ORC または OSS ORC |
送信方法
次のいずれかの方法でジョブを送信して、データを迅速にインポートできます。
LTS コンソールを使用してジョブを送信する
LTS コンソールにログインします。詳細については、「LTS の有効化とログイン」をご参照ください。
左側のナビゲーションウィンドウで、[データソース管理] > [データソースの追加] を選択して、次のデータソースを追加します。
ODPS データソースを追加します。詳細については、「ODPS データソース」をご参照ください。
Lindorm ワイドテーブルデータソースを追加します。詳細については、「Lindorm ワイドテーブルデータソース」をご参照ください。
HDFS データソースを追加します。詳細については、「HDFS データソースの追加」をご参照ください。
左側のナビゲーションウィンドウで、 を選択します。
説明3.8.12.4.3 より前の LTS バージョンの場合は、 を選択します。
LTS バージョンを表示するには、Lindorm コンソール の [インスタンス詳細] ページの [設定情報] セクションに移動します。
[ジョブの作成] をクリックし、次のパラメーターを設定します。
設定項目
パラメーター
説明
[データソースの選択]
[ソースデータソース]
追加した ODPS または HDFS データソースを選択します。
[宛先データソース]
追加した Lindorm ワイドテーブルデータソースを選択します。
[プラグイン設定]
[リーダー設定]
ソースが ODPS データソースの場合は、次のリーダーパラメーターを設定します。
table: ODPS テーブルの名前。
column: インポートする ODPS カラムの名前。
partition: パーティション化されていないテーブルの場合は空のままにします。パーティションテーブルの場合は、パーティション情報を設定します。
numPartitions: データを読み取るための並列度。
ソースが Hadoop 分散ファイルシステム (HDFS) 内の CSV ファイルの場合は、次のリーダーパラメーターを設定します。
filePath: CSV ファイルが配置されているディレクトリ。
header: CSV ファイルにヘッダー行が含まれているかどうかを指定します。
delimite: CSV ファイルで使用される区切り文字。
column: CSV ファイル内のカラム名とそれに対応する型。
ソースが HDFS 内の Parquet ファイルの場合は、次のリーダーパラメーターを設定します。
filePath: Parquet ファイルが配置されているディレクトリ。
column: Parquet ファイル内のカラム名。
説明設定例については、「設定例」をご参照ください。
[ライター設定]
namespace: Lindorm ワイドテーブルの名前空間。
lindormTable: Lindorm ワイドテーブルの名前。
compression: 圧縮アルゴリズム。現在、zstd のみがサポートされています。圧縮を無効にするには、これを none に設定します。
columns: 宛先テーブルのタイプに基づいてこのパラメーターを設定します。
Lindorm ワイドテーブルにデータをインポートする場合は、Lindorm SQL ワイドテーブルのカラム名を指定します。カラムはリーダー設定のカラムに対応している必要があります。
HBase と互換性のある Lindorm テーブルにデータをインポートする場合は、HBase テーブルの標準カラム名を指定します。カラムはリーダー設定のカラムに対応している必要があります。
timestamp: Lindorm ワイドテーブル内のデータのタイムスタンプ。次のタイプがサポートされています。
13 桁の値を持つ Long 型。
yyyy-MM-dd HH:mm:ss または yyyy-MM-dd HH:mm:ss SSS 形式の String 型。
説明設定例については、「設定例」をご参照ください。
[ジョブ実行パラメーター設定]
[Spark ドライバー仕様]
Spark ドライバーの仕様を選択します。
[Spark エグゼキュータ仕様]
Spark エグゼキュータの仕様を選択します。
[エグゼキュータ数]
エグゼキュータの数を入力します。
[Spark 設定]
Spark 設定を入力します。このパラメーターはオプションです。
[作成] をクリックします。
[Bulkload] ページで、[ジョブ名] をクリックしてジョブの詳細を表示します。
[ジョブ名] をクリックして、ジョブの Spark UI を表示します。
[詳細] をクリックして、ジョブの実行ログを表示します。
説明宛先の Lindorm ワイドテーブルのパーティション間でデータが均等に分散されている場合、4:1 の圧縮率で 100 GB のデータをインポートするのに約 1 時間かかります。実際の時間は異なる場合があります。
設定例
リーダープラグインの設定例
ODPS データソースのリーダー設定の例。
{ "table": "test", "column": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ], "partition": [ "pt=1" ], "numPartitions":10 }HDFS データソース内の CSV ファイルのリーダー設定の例。
{ "filePath":"csv/", "header": false, "delimiter": ",", "column": [ "id|string", "intcol|int", "doublecol|double", "stringcol|string", "string1col|string", "decimalcol|decimal" ] }HDFS データソース内の Parquet ファイルのリーダー設定の例。
{ "filePath":"parquet/", "column": [ // Parquet ファイル内のカラム名。 "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }
ライタープラグインの設定例
Lindorm SQL テーブルにデータをインポートするためのライター設定の例。
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "id", "intcol", "doublecol", "stringcol", "string1col", "decimalcol" ] }HBase と互換性のある Lindorm テーブルにデータをインポートするためのライター設定の例。
{ "namespace": "default", "lindormTable": "xxx", "compression":"zstd", "timestamp":"2022-07-01 10:00:00", "columns": [ "ROW||String", // ROW は行キーを表し、String は型を表します。 "f:intcol||Int", // フォーマット: カラムファミリー:カラム名||カラム型。 "f:doublecol||Double", "f:stringcol||String", "f:string1col||String", "f:decimalcol||Decimal" ] }
API 操作を使用してジョブを送信する
ジョブの送信
API 操作 (POST):
http://{LTSMaster}:12311/pro/proc/bulkload/create。{LTSMaster} を Lindorm インスタンスのマスターホスト名に置き換えます。ホスト名は、Lindorm インスタンスの LTS コンソールの [クラスター情報] ページの [基本情報] セクションから取得できます。
パラメーター:
パラメーター
説明
src
ソースデータソースの名前。
dst
宛先データソースの名前。
readerConfig
JSON 形式のリーダープラグイン設定。設定例については、「設定例」をご参照ください。
writerConfig
JSON 形式のライタープラグイン設定。設定例については、「設定例」をご参照ください。
driverSpec
ドライバーの仕様。有効な値: small、medium、large、xlarge。このパラメーターを large に設定することをお勧めします。
instances
エグゼキュータの数。
fileType
ソースデータソースが HDFS の場合は、このパラメーターを CSV または Parquet に設定します。
sparkAdditionalParams
拡張パラメーター。このパラメーターはオプションです。
例:
curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create次の内容が返されます。message パラメーターの値はジョブ ID です。
{"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}
ジョブ情報の取得
API 操作 (GET):
http://{LTSMaster}:12311/pro/proc/{procId}/info。{LTSMaster} を Lindorm インスタンスのマスターホスト名に置き換えます。ホスト名は、Lindorm インスタンスの LTS コンソールの [クラスター情報] ページの [基本情報] セクションから取得できます。パラメーター: procId はジョブ ID を示します。
例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info次の内容が返されます。
{ "data":{ "checkJobs":Array, "procId":"proc-91-ff383c616e5242888b398e51359c****", // ジョブ ID "incrJobs":Array, "procConfig":Object, "stage":"WAIT_FOR_SUCCESS", "fullJobs":Array, "mergeJobs":Array, "srcDS":"hdfs", // ソースデータソース "sinkDS":"ld-uf6el41jkba96****", // 宛先データソース "state":"RUNNING", // ジョブステータス "schemaJob":Object, "procType":"SPARK_BULKLOAD" // ジョブタイプ }, "success":"true" }
ジョブの停止
API 操作 (GET):
http://{LTSMaster}:12311/pro/proc/{procId}/abort。{LTSMaster} を Lindorm インスタンスのマスターホスト名に置き換えます。ホスト名は、Lindorm インスタンスの LTS コンソールの [クラスター情報] ページの [基本情報] セクションから取得できます。パラメーター: procId はジョブ ID を示します。
例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort次の内容が返されます。
{"success":"true","message":"ok"}
ジョブのリトライ
API 操作 (GET):
http://{LTSMaster}:12311/pro/proc/{procId}/retry。{LTSMaster} を Lindorm インスタンスのマスターホスト名に置き換えます。ホスト名は、Lindorm インスタンスの LTS コンソールの [クラスター情報] ページの [基本情報] セクションから取得できます。パラメーター: procId はジョブ ID を示します。
例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry次の結果が返されます。
{"success":"true","message":"ok"}
ジョブの削除
API 操作 (GET):
http://{LTSMaster}:12311/pro/proc/{procId}/delete。{LTSMaster} を Lindorm インスタンスのマスターホスト名に置き換えます。ホスト名は、Lindorm インスタンスの LTS コンソールの [クラスター情報] ページの [基本情報] セクションから取得できます。パラメーター: procId はジョブ ID を示します。
例:
curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete次の結果が返されます。
{"success":"true","message":"ok"}