すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:HDFS と ClickHouse 間でのデータのインポートとエクスポート

最終更新日:Mar 27, 2026

HDFS テーブルエンジンまたは HDFS テーブル関数を使用して、Hadoop 分散ファイルシステム (HDFS) と ClickHouse クラスター間でデータを移動します。どちらのアプローチでも、HDFS から ClickHouse へのファイルの読み取りと、ClickHouse から HDFS へのデータの書き込みが可能です。

前提条件

開始する前に、以下が準備できていることを確認してください:

注意事項

このトピックで示す HDFS URI のサンプルにおいて、ポート 9000 は非高可用性 (非 HA) モードの NameNode ポートです。HA モードでは、ポートは通常 8020 です。

アプローチの選択

HDFS テーブルエンジンと HDFS テーブル関数の両方が、HDFS への読み取りと書き込みをサポートしています。ワークフローに基づいて選択してください:

アプローチ 仕組み 最適なケース
HDFS テーブルエンジン HDFS ファイルをバックエンドとする永続テーブルを作成します。テーブルは削除されるまで存在します。 同じ HDFS パスへの繰り返し読み取りまたは書き込み
HDFS テーブル関数 永続テーブルを作成せずに、HDFS への読み取りまたは書き込みをインラインで行います。 1 回限りのインポートまたはエクスポート

制限事項

  • uri パラメーターはディレクトリではなく、ファイルを指す必要があります。データを書き込む前に親ディレクトリが存在している必要があります。存在しない場合、書き込みは失敗します。

  • デフォルトでは、ターゲットファイルが HDFS にすでに存在する場合、エクスポート操作は失敗します。これを変更するには、「エクスポート動作の設定」をご参照ください。

Hadoop クラスターから ClickHouse クラスターへのデータインポート

ステップ 1:ビジネス テーブルの作成

  1. SSH 経由で ClickHouse クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. ClickHouse クライアントを起動します:

    clickhouse-client -h core-1-1 -m
    core-1-1 はログインしたコアノードの名前です。クラスターに複数のコアノードがある場合は、いずれか 1 つに接続してください。
  3. product という名前のデータベースと、ローカルのレプリケートされたテーブル product.orders および分散テーブル product.orders_all の 2 つのテーブルを作成します:

    CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    
    CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
    (
        `uid`           UInt32,
        `date`          DateTime,
        `skuId`         UInt32,
        `order_revenue` UInt32
    )
    ENGINE = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
    PARTITION BY toYYYYMMDD(date)
    ORDER BY toYYYYMMDD(date);
    
    CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
    (
        `uid`           UInt32,
        `date`          DateTime,
        `skuId`         UInt32,
        `order_revenue` UInt32
    )
    ENGINE = Distributed(cluster_emr, product, orders, rand());
    {shard}{replica} は、EMR が ClickHouse クラスター用に自動生成するマクロです。

ステップ 2:データのインポート

HDFS テーブルエンジンの使用

HDFS テーブルエンジンは、HDFS 内のファイルをバックエンドとする ClickHouse テーブルを作成します。構文は次のとおりです:

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
    name1 [type1],
    name2 [type2],
    ...
)
ENGINE = HDFS(uri, format);
パラメータ 説明
db データベース名
table_name テーブル名
name1, name2 カラム名
type1, type2 カラムのデータの型
uri HDFS 内のファイルの URI。ファイルへのパスである必要があり、ディレクトリへのパスであってはなりません。親ディレクトリは存在している必要があります。
format ファイルフォーマット。例: CSV

手順

  1. orders.csv サンプルファイルをダウンロードし、ご利用の Hadoop クラスターにアップロードします。この例では、ファイルをルートディレクトリにアップロードします。

  2. HDFS データベースと、アップロードされたファイルを指す HDFS バックエンドのテーブルを作成します:

    CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
    
    CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
    (
        `uid`           UInt32,
        `date`          DateTime,
        `skuId`         UInt32,
        `order_revenue` UInt32
    )
    ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
    192.168.. を、ご利用の Hadoop クラスターの core-1-1 ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。
  3. データを product.orders_all にインポートします:

    INSERT INTO product.orders_all
    SELECT uid, date, skuId, order_revenue
    FROM hdfs.orders;
  4. hdfs.orders にあって product.orders_all にない行を確認することで、データ整合性を検証します:

    SELECT a.*
    FROM hdfs.orders a
    LEFT ANTI JOIN product.orders_all
    USING uid;

    結果セットが空であれば、インポートが成功したことを示します。

HDFS テーブル関数の使用

hdfs() テーブル関数は、永続テーブルを作成せずに HDFS からデータをインラインで読み取ります。構文は次のとおりです:

hdfs(uri, format, structure)
パラメーター 説明
uri HDFS 内のファイルの URI。ディレクトリではなく、ファイルを指す必要があります。親ディレクトリが存在している必要があります。
format ファイルフォーマット (例:CSV
structure 列定義。フォーマット: 'column1_name column1_type, column2_name column2_type, ...'

手順

  1. orders.csv サンプルファイルをダウンロードし、ご利用の Hadoop クラスターにアップロードします。この例では、ファイルをルートディレクトリにアップロードします。

  2. データを product.orders_all にインポートします:

    INSERT INTO product.orders_all
    SELECT uid, date, skuId, order_revenue
    FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV',
              'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
    192.168.. を、ご利用の Hadoop クラスターの core-1-1 ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。
  3. データ整合性を検証します:

    SELECT a.*
    FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV',
              'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    LEFT ANTI JOIN product.orders_all a
    USING uid;

    結果セットが空であれば、インポートが成功したことを示します。

ClickHouse クラスターから HDFS へのデータエクスポート

ステップ 1:ビジネス テーブルの作成

インポートの例と同じテーブルスキーマを使用します。上記の「ステップ 1:ビジネス テーブルの作成」に従ってください。

ステップ 2:サンプルデータの準備

サンプル行を product.orders_all に挿入します:

INSERT INTO product.orders_all VALUES
    (60333391, '2021-08-04 11:26:01', 49358700, 89),
    (38826285, '2021-08-03 10:47:29', 25166907, 27),
    (10793515, '2021-07-31 02:10:31', 95584454, 68),
    (70246093, '2021-08-01 00:00:08', 82355887, 97),
    (70149691, '2021-08-02 12:35:45', 68748652,  1),
    (87307646, '2021-08-03 19:45:23', 16898681, 71),
    (61694574, '2021-08-04 23:23:32', 79494853, 35),
    (61337789, '2021-08-02 07:10:42', 23792355, 55),
    (66879038, '2021-08-01 16:13:19', 95820038, 89);

ステップ 3:エクスポート動作の設定 (任意)

デフォルトでは、ターゲットファイルが HDFS にすでに存在する場合、エクスポートは失敗します。バージョン V5.8.0、V3.45.0、またはそれ以降のマイナーバージョンの EMR クラスターでは、エクスポート前に以下のいずれかの設定を任意で構成できます:

設定 デフォルト 動作 コマンド
hdfs_create_new_file_on_insert 無効 上書きする代わりに、同じディレクトリに新しいファイルを作成します SET hdfs_create_new_file_on_insert = 1
hdfs_truncate_on_insert 無効 既存のファイルを上書きします。注意して使用してください。 SET hdfs_truncate_on_insert = 1

ステップ 4:データのエクスポート

HDFS テーブルエンジンの使用

  1. エクスポート先を指す HDFS バックエンドのテーブルを作成します:

    CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
    
    CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
    (
        `uid`           UInt32,
        `date`          DateTime,
        `skuId`         UInt32,
        `order_revenue` UInt32
    )
    ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
    192.168.. を、ご利用の Hadoop クラスターの core-1-1 ノードのプライベート IP アドレスに置き換えてください。この IP アドレスを確認するには、EMR コンソールに移動し、左側のナビゲーションウィンドウで [EMR on ECS] をクリックし、Hadoop クラスターの行の [操作] 列にある [ノード] をクリックします。IP アドレスは [ノード] タブに表示されます。
  2. データをエクスポートします:

    INSERT INTO hdfs.orders
    SELECT uid, date, skuId, order_revenue
    FROM product.orders_all;
  3. product.orders_all にあって hdfs.orders にない行を確認することで、データ整合性を検証します:

    SELECT a.*
    FROM hdfs.orders
    RIGHT ANTI JOIN product.orders_all a
    USING uid;

    結果セットが空であれば、エクスポートが成功したことを示します。

HDFS テーブル関数の使用

  1. データをエクスポートします:

    INSERT INTO FUNCTION
        hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV',
             'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    SELECT uid, date, skuId, order_revenue
    FROM product.orders_all;
  2. データ整合性を検証します:

    SELECT a.*
    FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV',
              'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    RIGHT ANTI JOIN product.orders_all a
    USING uid;

    結果セットが空であれば、エクスポートが成功したことを示します。

HDFS パラメーターの設定

EMR ClickHouse クラスターの構成で HDFS パラメーターを直接設定します。

Apache HAWQ のウェブサイトで HDFS Configuration Reference のパラメーターを検索する際は、アンダースコア (_) をピリオド (.) に置き換えてください。例えば、dfs_default_replica を調べるには、dfs.default.replica を検索します。

グローバル構成 — すべてのユーザーに適用されます:

<hdfs>
    <dfs_default_replica>3</dfs_default_replica>
</hdfs>

ユーザー固有の構成 — 指定されたユーザーにのみ適用されます。グローバル構成とユーザー固有の構成の両方に同じキーが異なる値で存在する場合、ユーザー固有の値が優先されます。

<hdfs_${user}>
    <dfs_default_replica>3</dfs_default_replica>
</hdfs_${user}>