Broker Load は、ApsaraDB for SelectDB インスタンスに一度に非同期でデータをインポートするために使用されます。 Broker Load を使用すると、Hadoop 分散ファイルシステム (HDFS)、Object Storage Service (OSS)、Amazon Simple Storage Service (Amazon S3) などの分散ストレージシステムから数百 GB のデータまで効率的に読み取ることができます。 このトピックでは、Broker Load を使用して ApsaraDB for SelectDB インスタンスにデータをインポートする方法について説明します。
メリット
Broker Load には、次のメリットがあります。
大量のデータ: Broker Load を使用すると、一度に数百 GB のオフラインデータをインポートできます。
非同期の高い同時実行性: Broker Load を使用すると、データをブロックすることなく、非同期モードでデータをインポートできます。 これにより、クラスタのリソース使用率が向上します。
高い互換性: Broker Load を使用すると、HDFS や Amazon S3 などのリモートストレージシステムからデータを読み取ることができます。
使いやすさ:
MySQL プロトコルを介して Broker Load ジョブを作成し、データをインポートできます。
SHOW LOAD文を実行して、データのインポートの進捗状況と結果をリアルタイムで監視できます。
適用シナリオ
Broker Load を使用すると、HDFS、OSS、Amazon S3 などの分散ストレージシステムから大量のデータを効率的に読み取ることができます。
100 MB などの少量のデータのデータインポート適時性は、10 秒レベルです。
100 GB などの大量のデータのデータインポート適時性は、10 分レベルです。
Broker Load ジョブの作成
Broker Load ジョブは、ブローカーを使用して、HDFS や Amazon S3 などのリモートストレージシステムから ApsaraDB for SelectDB インスタンスのテーブルにデータを読み込んでインポートします。
構文
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];パラメータ
パラメータ | 説明 |
| Broker Load ジョブの一意の識別子。 インポート文で Broker Load ジョブのラベルをカスタマイズできます。 Broker Load ジョブが送信されると、ラベルに基づいてジョブのステータスを照会できます。 一意のラベルを使用すると、同じデータが繰り返しインポートされるのを防ぐこともできます。 ラベルに関連付けられている Broker Load ジョブが CANCELLED 状態の場合、ラベルは別の Broker Load ジョブで使用できます。 形式: 説明 同じバッチのデータには同じラベルを使用することをお勧めします。 これにより、同じバッチのデータをインポートするための繰り返しのリクエストは 1 回だけ受け入れられます。 これにより、at-most-once セマンティクスが保証されます。 |
| インポートするファイルの説明。 詳細については、このトピックの「data_desc1 のパラメータ」セクションをご参照ください。 |
| 使用するブローカーのタイプ。 有効な値: HDFS および S3。 S3 ブローカーを使用する Broker Load ジョブは、Object Storage Service (OSS) Load ジョブとも呼ばれます。 詳細については、「OSS を使用したデータのインポート」をご参照ください。 |
| ブローカーが Baidu Object Storage (BOS) や HDFS などのリモートストレージシステムにアクセスするために必要なパラメータ。 構文: |
| インポートのパラメータ。 詳細については、このトピックの「load_properties のパラメータ」セクションをご参照ください。 |
data_desc1 のパラメータ
[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]パラメータ | 説明 |
| データをマージするモード。 デフォルト値: APPEND。これは、インポートが標準の追加操作であることを指定します。 このパラメータは、Unique Key モデルを使用するテーブルの場合にのみ、MERGE または DELETE に設定できます。 このパラメータを MERGE に設定する場合は、DELETE ON 文を使用して、削除フラグ列として機能する列を指定する必要があります。 このパラメータを DELETE に設定すると、インポートに関連するすべてのデータが、インポートを実行するテーブルから削除されます。 |
| インポートするファイルへのパス。 複数のファイルパスを列挙し、ワイルドカードを使用してファイルを照合できます。 各パスがディレクトリだけでなく、実際のファイルを指していることを確認してください。 そうしないと、インポートは失敗します。 |
| インポートがネガティブインポートであることを指定します。 このパラメータは、SUM 関数を使用して集計された INTEGER タイプのデータに対してのみ有効です。 このパラメータを指定すると、SUM 関数を使用して集計された INTEGER タイプのデータに対して否定演算が実行されます。 これにより、インポートされたエラーデータを相殺できます。 |
| インポートが制限されるテーブルの特定のパーティション。 指定されたパーティション内にないデータは、インポートプロセスから除外されます。 |
| 列区切り文字。 このパラメータは、インポートするファイルが CSV ファイルの場合にのみ有効です。 1 バイトの区切り文字のみを指定できます。 |
| インポートするファイルの形式。 デフォルト値: CSV。 有効な値: CSV、PARQUET、ORC。 |
| インポートするファイルの列のシーケンス。 |
| インポートするファイルから抽出する列。 |
| 事前フィルタ条件。 データはまず、 |
| 列変換の関数。 |
| データのフィルタ条件。 |
| インポートするデータで削除フラグ列として機能する列を指定し、計算関係を定義するために使用される文。 MERGE インポートモードを使用する場合は、式が必要です。 このパラメータは、Unique Key モデルを使用するテーブルの場合にのみ有効です。 |
| インポートするデータでシーケンス列として機能する列を指定するために使用される文。 このパラメータは、インポート中に正しいデータの順序を維持するために使用されます。 このパラメータは、Unique Key モデルを使用するテーブルの場合にのみ有効です。 |
| インポートするファイルの形式関連パラメータ。 たとえば、JSON ファイルをインポートするには、json_root、jsonpaths、fuzzy_parse などのパラメータを指定できます。 |
load_properties のパラメータ
パラメータ | 説明 |
| インポートのタイムアウト期間。 単位: 秒。 デフォルト値: 14400。これは 4 時間を指定します。 |
| データ標準に準拠していないなどの理由で、インポート中にフィルタリングできるデータの最大許容比率。 デフォルト値: 0。これは、データをフィルタリングできないゼロトレランスポリシーを指定します。 有効な値: 0 ~ 1。 |
| インポートジョブに割り当てることができるメモリの最大サイズ。 単位: バイト。 デフォルト値: 2147483648。これは 2 GB を指定します。 |
| インポートジョブの厳密モードを有効にするかどうかを指定します。 デフォルト値: false。 |
| インポートジョブのタイムゾーン依存関数に使用されるタイムゾーン。 デフォルト値: |
| インポートの並列度 (DOP)。 デフォルト値: 1。 このパラメータを 1 より大きい値に設定すると、複数のインポートジョブを同時に実行するために複数の実行計画が開始されます。 これにより、インポートプロセスが高速化されます。 |
| 処理するデータをバッチで送信するための DOP。 このパラメータの値が、計算クラスタのバックエンド (BE) 構成の max_send_batch_parallelism_per_job パラメータの値よりも大きい場合、計算クラスタには max_send_batch_parallelism_per_job パラメータの値が使用されます。 |
| 対応するパーティション内の 1 つのタブレットのみにデータをインポートするかどうかを指定します。 デフォルト値: false。 このパラメータは、Duplicate Key モデルを使用し、ランダムバケットを含むテーブルにデータをインポートする場合にのみ有効です。 |
例
ApsaraDB for SelectDB インスタンスでデータをインポートするテーブルを作成します。 サンプルコード:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50) ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1"); CREATE TABLE test_table2 ( id int, name varchar(50), age int, address varchar(50) ) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");データをインポートするファイルを作成します。
次の内容を含む
file1.txtという名前のファイルを作成します。1,tomori,32,shanghai 2,anon,22,beijing 3,taki,23,shenzhen 4,rana,45,hangzhou 5,soyo,14,shanghai 6,saki,25,hangzhou 7,mutsumi,45,shanghai 8,uika,26,shanghai 9,umiri,27,shenzhen 10,nyamu,37,shanghai次の内容を含む
file2.csvという名前のファイルを作成します。1,saki,25,hangzhou 2,mutsumi,45,shanghai 3,uika,26,shanghai 4,umiri,27,shenzhen 5,nyamu,37,shanghai
ファイルのデータをテーブルにインポートします。
HDFS から
file1.txtファイルをインポートします。サンプルコード:LOAD LABEL example_db.label1 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );file1.txt ファイルを test_table テーブルにインポートします。 列はコンマ (,) で区切ります。 HDFS からデータをインポートする場合は、broker_properties パラメータで
fs.defaultFSプロパティを指定して、システムが HDFS クラスタに想定どおりに接続し、対応するファイルを見つけられるようにする必要があります。HDFS から 2 つのファイルを 2 つのテーブルにインポートします。 サンプルコード:
LOAD LABEL test_db.test_02 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,temp_age,address) SET ( age = temp_age + 1 ), DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );file1.txtファイルをtest_tableテーブルにインポートします。file2.csvファイルをtest_table2テーブルにインポートし、file2.csvファイルの temp_age 列の値に基づいて、age 列の値を 1 ずつ増分します。高可用性 (HA) モードでデプロイされた HDFS クラスタからバッチデータをインポートします。 サンプルコード:
LOAD LABEL test_db.test_03 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*") INTO TABLE `test_table` COLUMNS TERMINATED BY "\\x01" ) WITH HDFS ( "hadoop.username" = "hive", "fs.defaultFS" = "hdfs://my_ha", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );Hive のデフォルト デリミタ
\\x01を使用し、ワイルドカード(*)を使用してdataディレクトリ内のすべてのファイルを指定します。file1.txt ファイルのデータをフィルタリングして、フィルタ条件を満たすデータ行のみをインポートします。 サンプルコード:
LOAD LABEL test_db.test_04 ( DATA INFILE("hdfs://host:port/example/file1.txt") INTO TABLE `test_table2` COLUMNS TERMINATED BY "," (id,name,age,address) WHERE age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" );age列の値が 20 未満の行のみがインポートされます。HDFS から file1.txt ファイルをインポートします。 インポートのタイムアウト期間とフィルタリング比率を指定します。
age列の値が 20 未満の行を除くすべての行をファイルからインポートします。 サンプルコード:LOAD LABEL test_db.test_05 ( MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt") INTO TABLE `test_table` COLUMNS TERMINATED BY "," (id,name,age,address) DELETE ON age < 20 ) WITH HDFS ( "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );MERGE モードでデータをインポートします。
test_tableテーブルは一意キーモデルを使用する必要があります。インポートされる行のage列の値が 20 未満の場合、その行は削除対象としてマークされます。インポート ジョブのタイムアウト期間は 3,600 秒で、エラーデータ行の最大フィルタリング率は 10% です。
Broker Load ジョブのキャンセル
Broker Load ジョブが CANCELLED または FINISHED 状態ではない場合、ジョブを手動でキャンセルできます。 キャンセルするインポートジョブのラベルを指定する必要があります。 インポートジョブがキャンセルされると、ジョブに書き込まれたデータはロールバックされ、有効になりません。
構文
CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];パラメータ
パラメータ | 説明 |
| データベースの名前。 デフォルトでは、このパラメータを指定しないと、現在のデータベースが使用されます。 |
| インポートジョブのラベル。 完全一致がサポートされています。 LABEL LIKE 文を使用すると、ラベルに label_pattern が含まれるインポートジョブが照合されます。 |
例
ラベルが
example_db_test_load_labelであるインポートジョブをexample_dbデータベースからキャンセルします。CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";ラベルに
example_が含まれるインポートジョブをexample_dbデータベースからキャンセルします。CANCEL LOAD FROM example_db WHERE LABEL like "example_";
Broker Load ジョブのステータスの照会Broker ロード ジョブ
Broker Load は非同期のインポート方式です。 インポート文が正常に実行された場合は、データのインポートが完了したのではなく、Broker Load ジョブが正常に送信されたことのみを示します。 Broker Load ジョブのステータスを照会するには、SHOW LOAD 文を実行します。
構文
SHOW LOAD
[FROM db_name]
[
WHERE
[LABEL [ = "your_label" | LIKE "label_matcher"]]
[STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];パラメータ
パラメータ | 説明 |
| データベースの名前。 デフォルトでは、このパラメータを指定しないと、現在のデータベースが使用されます。 |
| インポートジョブのラベル。 完全一致がサポートされています。 LABEL LIKE 文を使用すると、ラベルに label_matcher が含まれるインポートジョブが照合されます。 |
| インポートジョブのステータス。 指定された状態のインポートジョブのみを表示できます。 |
| 返されるデータレコードのソート順序。 |
| 表示されるデータレコード数の制限。 このパラメータを指定しないと、すべてのデータレコードが表示されます。 |
| クエリ結果の表示を開始する前にスキップする初期レコードの数。 デフォルト値: 0。 |
例
example_dbデータベースでラベルに2014_01_02が含まれるインポートジョブを照会し、最も長く保存されている 10 個のインポートジョブを表示します。SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;example_dbデータベースでラベルがload_example_db_20140102であるインポートジョブを照会します。 これらのジョブをLoadStartTimeで降順にソートします。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;example_dbデータベースでラベルがload_example_db_20140102であるインポートジョブを照会します。 インポートジョブはloading状態です。SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";example_dbデータベースのインポートジョブを照会し、これらのジョブをLoadStartTimeで降順にソートします。 最初の 5 つのクエリ結果をスキップし、次の 10 個のクエリ結果を表示します。SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10; SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
ベストプラクティス
インポートジョブのステータスの照会
Broker Load は非同期のインポート方式です。 インポート文が正常に実行された場合は、データのインポートが完了したのではなく、Broker Load ジョブが正常に送信されたことのみを示します。 インポートジョブのステータスを照会するには、
SHOW LOAD文を実行します。インポートジョブのキャンセル
送信済みだが完了していないインポートジョブをキャンセルするには、
CANCEL LOAD文を実行します。 インポートジョブがキャンセルされると、ジョブに書き込まれたデータはロールバックされ、有効になりません。ラベル、インポートトランザクション、複数テーブルの原子性
ApsaraDB for SelectDB のすべてのインポートジョブは、本質的にアトミックです。 インポートジョブで複数のテーブルにデータをインポートする場合でも、原子性は保証されます。 また、ApsaraDB for SelectDB はラベルを使用して、インポートされたデータが失われたり重複したりしないようにします。
列のマッピング、導出、フィルタリング
ApsaraDB for SelectDB は、インポート文での列変換と列フィルタリングに関するさまざまな操作をサポートしています。 ほとんどのビルトイン関数とユーザー定義関数 (UDF) がサポートされています。 詳細については、「ソースデータの変換」をご参照ください。
エラーデータ行のフィルタリング
ApsaraDB for SelectDB では、インポートジョブでフォーマットが正しくないデータ行をスキップできます。フィルタリング率は、
max_filter_ratioパラメーターで指定します。デフォルト値は 0 で、データをフィルタリングしないゼロトレランスポリシーを指定します。エラーデータ行が見つかった場合、インポートジョブは失敗します。インポート中にエラーデータ行を無視する場合は、このパラメーターを 0 ~ 1 の値に設定できます。このようにして、ApsaraDB for SelectDB はデータ形式が正しくない行を自動的にスキップします。トレランス比率の計算方法の詳細については、「ソースデータの変換」をご参照ください。
厳密モード
strict_modeプロパティは、インポート ジョブを厳密モードで実行するかどうかを指定するために使用します。インポート ジョブが厳密モードで実行される場合、列のマッピング、変換、およびフィルタリングが影響を受けます。タイムアウト期間
Broker Load ジョブのデフォルトのタイムアウト期間は 4 時間です。 タイミングは、インポートジョブが送信された時点から開始されます。 インポートジョブがタイムアウト期間内に完了しない場合、インポートジョブは失敗します。
データ量とジョブ数の制限
Broker Load ジョブでは、一度に 100 GB 未満のデータをインポートすることをお勧めします。 理論的には、インポートジョブでインポートできるデータ量に制限はありません。 ただし、インポートするデータ量が非常に大きい場合、インポートジョブの実行に長時間かかる可能性があり、インポートジョブが失敗した場合の再試行のコストが高くなります。
また、クラスタ内のノード数は限られています。 したがって、インポートされるデータの最大量には制限が設定されています。これは、ノード数に 3 GB を掛けた値です。 これにより、システムリソースが適切に使用されるようになります。 大量のデータをインポートする必要がある場合は、複数のインポートジョブを送信することをお勧めします。
ApsaraDB for SelectDB は、クラスタ内の同時インポートジョブ数を 3 ~ 10 の範囲に制限しています。 送信したインポートジョブの数が制限を超えると、超過したインポートジョブはキューで待機します。 キューの最大長は 100 です。 100 を超えるインポートジョブが待機している場合、超過したインポートジョブは直接拒否されます。
説明待機時間もインポートジョブの合計時間に含まれます。 タイムアウトエラーが発生した場合、ジョブはキャンセルされます。 インポートジョブのステータスを監視して、ジョブの送信頻度を適切に制御することをお勧めします。