HDFS テーブルエンジンまたは HDFS テーブル関数を使用して、Hadoop 分散ファイルシステム (HDFS) と ClickHouse クラスター間でデータを移動します。どちらのアプローチでも、HDFS から ClickHouse へのファイルの読み取りと、ClickHouse から HDFS へのデータの書き込みが可能です。
前提条件
開始する前に、以下が準備できていることを確認してください:
-
E-MapReduce (EMR) Hadoop クラスター。詳細については、「クラスターの作成」をご参照ください。
-
EMR ClickHouse クラスター。詳細については、「ClickHouse クラスターの作成」をご参照ください。
注意事項
このトピックで示す HDFS URI のサンプルにおいて、ポート 9000 は非高可用性 (非 HA) モードの NameNode ポートです。HA モードでは、ポートは通常 8020 です。
アプローチの選択
HDFS テーブルエンジンと HDFS テーブル関数の両方が、HDFS への読み取りと書き込みをサポートしています。ワークフローに基づいて選択してください:
| アプローチ | 仕組み | 最適なケース |
|---|---|---|
| HDFS テーブルエンジン | HDFS ファイルをバックエンドとする永続テーブルを作成します。テーブルは削除されるまで存在します。 | 同じ HDFS パスへの繰り返し読み取りまたは書き込み |
| HDFS テーブル関数 | 永続テーブルを作成せずに、HDFS への読み取りまたは書き込みをインラインで行います。 | 1 回限りのインポートまたはエクスポート |
制限事項
-
uriパラメーターはディレクトリではなく、ファイルを指す必要があります。データを書き込む前に親ディレクトリが存在している必要があります。存在しない場合、書き込みは失敗します。 -
デフォルトでは、ターゲットファイルが HDFS にすでに存在する場合、エクスポート操作は失敗します。これを変更するには、「エクスポート動作の設定」をご参照ください。
Hadoop クラスターから ClickHouse クラスターへのデータインポート
ステップ 1:ビジネス テーブルの作成
-
SSH 経由で ClickHouse クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
-
ClickHouse クライアントを起動します:
clickhouse-client -h core-1-1 -mcore-1-1はログインしたコアノードの名前です。クラスターに複数のコアノードがある場合は、いずれか 1 つに接続してください。 -
productという名前のデータベースと、ローカルのレプリケートされたテーブルproduct.ordersおよび分散テーブルproduct.orders_allの 2 つのテーブルを作成します:CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date); CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = Distributed(cluster_emr, product, orders, rand());{shard}と{replica}は、EMR が ClickHouse クラスター用に自動生成するマクロです。
ステップ 2:データのインポート
HDFS テーブルエンジンの使用
HDFS テーブルエンジンは、HDFS 内のファイルをバックエンドとする ClickHouse テーブルを作成します。構文は次のとおりです:
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
)
ENGINE = HDFS(uri, format);
| パラメータ | 説明 |
|---|---|
db |
データベース名 |
table_name |
テーブル名 |
name1, name2 |
カラム名 |
type1, type2 |
カラムのデータの型 |
uri |
HDFS 内のファイルの URI。ファイルへのパスである必要があり、ディレクトリへのパスであってはなりません。親ディレクトリは存在している必要があります。 |
format |
ファイルフォーマット。例: CSV |
手順
-
orders.csv サンプルファイルをダウンロードし、ご利用の Hadoop クラスターにアップロードします。この例では、ファイルをルートディレクトリにアップロードします。
-
HDFS データベースと、アップロードされたファイルを指す HDFS バックエンドのテーブルを作成します:
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');192.168..を、ご利用の Hadoop クラスターのcore-1-1ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。 -
データを
product.orders_allにインポートします:INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs.orders; -
hdfs.ordersにあってproduct.orders_allにない行を確認することで、データ整合性を検証します:SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;結果セットが空であれば、インポートが成功したことを示します。
HDFS テーブル関数の使用
hdfs() テーブル関数は、永続テーブルを作成せずに HDFS からデータをインラインで読み取ります。構文は次のとおりです:
hdfs(uri, format, structure)
| パラメーター | 説明 |
|---|---|
uri |
HDFS 内のファイルの URI。ディレクトリではなく、ファイルを指す必要があります。親ディレクトリが存在している必要があります。 |
format |
ファイルフォーマット (例:CSV |
structure |
列定義。フォーマット: 'column1_name column1_type, column2_name column2_type, ...' |
手順
-
orders.csv サンプルファイルをダウンロードし、ご利用の Hadoop クラスターにアップロードします。この例では、ファイルをルートディレクトリにアップロードします。
-
データを
product.orders_allにインポートします:INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');192.168..を、ご利用の Hadoop クラスターのcore-1-1ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。 -
データ整合性を検証します:
SELECT a.* FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') LEFT ANTI JOIN product.orders_all a USING uid;結果セットが空であれば、インポートが成功したことを示します。
ClickHouse クラスターから HDFS へのデータエクスポート
ステップ 1:ビジネス テーブルの作成
インポートの例と同じテーブルスキーマを使用します。上記の「ステップ 1:ビジネス テーブルの作成」に従ってください。
ステップ 2:サンプルデータの準備
サンプル行を product.orders_all に挿入します:
INSERT INTO product.orders_all VALUES
(60333391, '2021-08-04 11:26:01', 49358700, 89),
(38826285, '2021-08-03 10:47:29', 25166907, 27),
(10793515, '2021-07-31 02:10:31', 95584454, 68),
(70246093, '2021-08-01 00:00:08', 82355887, 97),
(70149691, '2021-08-02 12:35:45', 68748652, 1),
(87307646, '2021-08-03 19:45:23', 16898681, 71),
(61694574, '2021-08-04 23:23:32', 79494853, 35),
(61337789, '2021-08-02 07:10:42', 23792355, 55),
(66879038, '2021-08-01 16:13:19', 95820038, 89);
ステップ 3:エクスポート動作の設定 (任意)
デフォルトでは、ターゲットファイルが HDFS にすでに存在する場合、エクスポートは失敗します。バージョン V5.8.0、V3.45.0、またはそれ以降のマイナーバージョンの EMR クラスターでは、エクスポート前に以下のいずれかの設定を任意で構成できます:
| 設定 | デフォルト | 動作 | コマンド |
|---|---|---|---|
hdfs_create_new_file_on_insert |
無効 | 上書きする代わりに、同じディレクトリに新しいファイルを作成します | SET hdfs_create_new_file_on_insert = 1 |
hdfs_truncate_on_insert |
無効 | 既存のファイルを上書きします。注意して使用してください。 | SET hdfs_truncate_on_insert = 1 |
ステップ 4:データのエクスポート
HDFS テーブルエンジンの使用
-
エクスポート先を指す HDFS バックエンドのテーブルを作成します:
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');192.168..を、ご利用の Hadoop クラスターのcore-1-1ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。 -
データをエクスポートします:
INSERT INTO hdfs.orders SELECT uid, date, skuId, order_revenue FROM product.orders_all; -
product.orders_allにあってhdfs.ordersにない行を確認することで、データ整合性を検証します:SELECT a.* FROM hdfs.orders RIGHT ANTI JOIN product.orders_all a USING uid;結果セットが空であれば、エクスポートが成功したことを示します。
HDFS テーブル関数の使用
-
データをエクスポートします:
INSERT INTO FUNCTION hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all; -
データ整合性を検証します:
SELECT a.* FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') RIGHT ANTI JOIN product.orders_all a USING uid;結果セットが空であれば、エクスポートが成功したことを示します。
HDFS パラメーターの設定
EMR ClickHouse クラスターの構成で HDFS パラメーターを直接設定します。
Apache HAWQ のウェブサイトで HDFS Configuration Reference のパラメーターを検索する際は、アンダースコア (_) をピリオド (.) に置き換えてください。例えば、dfs_default_replicaを調べるには、dfs.default.replicaを検索します。
グローバル構成 — すべてのユーザーに適用されます:
<hdfs>
<dfs_default_replica>3</dfs_default_replica>
</hdfs>
ユーザー固有の構成 — 指定されたユーザーにのみ適用されます。グローバル構成とユーザー固有の構成の両方に同じキーが異なる値で存在する場合、ユーザー固有の値が優先されます。
<hdfs_${user}>
<dfs_default_replica>3</dfs_default_replica>
</hdfs_${user}>