All Products
Search
Document Center

E-MapReduce:Import and export data between HDFS and ClickHouse

Last Updated:Sep 28, 2023

You can use Hadoop Distributed File System (HDFS) table engines or HDFS table functions to read and write data. This topic describes how to import data from HDFS to a ClickHouse cluster and how to export data from a ClickHouse cluster to HDFS.

Prerequisites

Precautions

In the sample code in this topic, 9000 in the HDFS URL indicates the port number of NameNode in non-high availability (HA) mode. In most cases, if you use NameNode in HA mode, the port number is 8020.

Import data from a Hadoop cluster to a ClickHouse cluster

Step 1: Create a business table

  1. Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start the ClickHouse client:

    clickhouse-client -h core-1-1 -m
    Note

    In the sample command, core-1-1 indicates the name of the core node that you log on to. If your cluster has multiple core nodes, you can log on to one of the core nodes.

  3. Execute the following statements to create a database named product and a business table named orders in the product database:

    CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
    PARTITION BY toYYYYMMDD(date)
    ORDER BY toYYYYMMDD(date);
    CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    Engine = Distributed(cluster_emr, product, orders, rand());
    Note

    In the sample code, {shard} and {replica} are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.

Step 2: Import data

Use HDFS table engines to import data

HDFS table engines of ClickHouse can read file data in a specific format from a specified HDFS address. Syntax:

CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
  name1 [type1],
  name2 [type2],
  ...
)
Engine = HDFS(uri, format);

Parameter

Description

db

The name of the database.

table_name

The name of the table.

name1/name2

The names of the columns in the table.

tyep1/type2

The data types of the columns in the table.

uri

The URI of the file in HDFS.

format

The type of the file.

Note

The URI cannot be a directory address, and the directory to which the file belongs must exist. Otherwise, an error is reported when data is written.

  1. Create an HDFS engine table and prepare data.

    1. Download the sample file orders.csv and upload the file to a directory of the Hadoop cluster. In this example, the sample file is uploaded to the root directory of the Hadoop cluster.

    2. Execute the following statements to create a database named hdfs and an HDFS table named hdfs.orders:

      CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
      CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
      Note

      In this example, the sample data is uploaded to the root directory of the Hadoop cluster. In the sample code, 192.168.**.** is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.

  2. Execute the following statements to import data into the product.orders_all table:

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      hdfs.orders;
  3. Execute the following statement to check data consistency:

    SELECT
      a.*
    FROM
      hdfs.orders a
    LEFT ANTI JOIN
      product.orders_all
    USING uid;

Use HDFS table functions to import data

HDFS table functions of ClickHouse can read file data from a specified HDFS address and return a table with a specified schema. Syntax:

hdfs(uri, format, structure);

Parameter

Description

uri

The URI of the file in HDFS.

format

The type of the file.

structure

The data types of the fields in the table. Example: column1 UInt32 and column2 String.

Note

The URI cannot be a directory address, and the directory to which the file belongs must exist. Otherwise, an error is reported when data is written.

  1. Download the sample file orders.csv and upload the file to a directory of the Hadoop cluster. In this example, the sample file is uploaded to the root directory of the Hadoop cluster.

  2. Execute the following statements to import data into the product.orders_all table:

    INSERT INTO product.orders_all
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
    Note

    In this example, the sample data is uploaded to the root directory of the Hadoop cluster. In the sample code, 192.168.**.** is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.

  3. Execute the following statement to check data consistency:

    SELECT
      a.*
    FROM
      hdfs.orders a
    LEFT ANTI JOIN
      product.orders_all
    USING uid;

Export data from a ClickHouse cluster to HDFS

Step 1: Create a business table

In this example, the business table schema used for the data export is the same as that used for the data import. For more information, see Step 1: Create a business table.

Step 2: Prepare data

  1. Execute the following statement to insert data into the product.orders_all business table to prepare the data for subsequent export operations:

    INSERT INTO product.orders_all VALUES 
      (60333391,'2021-08-04 11:26:01',49358700,89) 
      (38826285,'2021-08-03 10:47:29',25166907,27) 
      (10793515,'2021-07-31 02:10:31',95584454,68) 
      (70246093,'2021-08-01 00:00:08',82355887,97) 
      (70149691,'2021-08-02 12:35:45',68748652,1)  
      (87307646,'2021-08-03 19:45:23',16898681,71) 
      (61694574,'2021-08-04 23:23:32',79494853,35) 
      (61337789,'2021-08-02 07:10:42',23792355,55) 
      (66879038,'2021-08-01 16:13:19',95820038,89);
  2. Optional. Specify an export method to prevent export failures that occur if the file to which you want to write data already exists. You can specify the export method for EMR clusters of V5.8.0, V3.45.0, and their later minor versions.

    Incremental export

    After you configure the following parameter, if the file to which you want to write data already exists in HDFS, a new file is created in the related directory to store the incremental data.

    set hdfs_create_new_file_on_insert=1

    Overwrite export

    After you configure the following parameter, if the file to which you want to write data already exists in HDFS, the data that you want to write will overwrite the existing data in the related file. Proceed with caution.

    set hdfs_truncate_on_insert=1

Step 3: Export data

Use HDFS table engines to export data

  1. Execute the following statements to create an HDFS table:

    CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr;
    CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr
    (
        `uid` UInt32,
        `date` DateTime,
        `skuId` UInt32,
        `order_revenue` UInt32
    )
    ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
    Note

    In this example, data is exported to the root directory of the Hadoop cluster. In the sample code, 192.168.**.** is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.

  2. Execute the following statements to export data. The data is stored in the related directory by using an HDFS table engine:

    INSERT INTO hdfs.orders
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
     product.orders_all;
    Note

    During the data export, ClickHouse creates a file in the related path and writes data to the file. By default, if the file to which you want to write data already exists in HDFS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.

  3. Execute the following statement to check data consistency:

    SELECT
      a.*
    FROM
      hdfs.orders
    RIGHT ANTI JOIN
      product.orders_all a
    USING uid;

Use HDFS table functions to export data

  1. Execute the following statements to export data:

    INSERT INTO FUNCTION
      hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
    SELECT
      uid,
      date,
      skuId,
      order_revenue
    FROM
      product.orders_all;
    Note

    During the data export, ClickHouse creates a file in the related path and writes data to the file. By default, if the file to which you want to write data already exists in HDFS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.

  2. Execute the following statement to check data consistency:

SELECT
  a.*
FROM
  hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
  product.orders_all a
USING uid;

Configure HDFS-related parameters

You can configure HDFS-related parameters for an EMR ClickHouse cluster.

  • Configure the HDFS-related parameters that take effect globally.

    <hdfs>
      <dfs_default_replica>3</dfs_default_replica>
    </hdfs>

    For more information about HDFS-related parameters, see HDFS Configuration Reference on the official website of Apache HAWQ.

    Note

    Replace underscores (_) with periods (.) when you search for parameters. For example, if you want to query the dfs_default_replica parameter in EMR, search for the dfs.default.replica parameter in the documentation on the official website.

  • Configure the HDFS-related parameters that take effect only for the user that is specified by ${user}. If the user-specific configuration and global configuration use the same key but different key values, the global configuration is overwritten by the user-specific configuration.

    <hdfs_${user}>
      <dfs_default_replica>3</dfs_default_replica>
    </hdfs_${user}>