You can use Hadoop Distributed File System (HDFS) table engines or HDFS table functions to read and write data. This topic describes how to import data from HDFS to a ClickHouse cluster and how to export data from a ClickHouse cluster to HDFS.
Prerequisites
An E-MapReduce (EMR) Hadoop cluster is created. For more information, see Create a cluster.
An EMR ClickHouse cluster is created. For more information, see Create a ClickHouse cluster.
Precautions
In the sample code in this topic, 9000 in the HDFS URL indicates the port number of NameNode in non-high availability (HA) mode. In most cases, if you use NameNode in HA mode, the port number is 8020.
Import data from a Hadoop cluster to a ClickHouse cluster
Step 1: Create a business table
Log on to the ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start the ClickHouse client:
clickhouse-client -h core-1-1 -mNoteIn the sample command, core-1-1 indicates the name of the core node that you log on to. If your cluster has multiple core nodes, you can log on to one of the core nodes.
Execute the following statements to create a database named product and a business table named orders in the product database:
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date); CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());NoteIn the sample code, {shard} and {replica} are macros that are automatically generated by Alibaba Cloud EMR for ClickHouse clusters and can be directly used.
Step 2: Import data
Use HDFS table engines to import data
HDFS table engines of ClickHouse can read file data in a specific format from a specified HDFS address. Syntax:
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
)
Engine = HDFS(uri, format);Parameter | Description |
| The name of the database. |
| The name of the table. |
| The names of the columns in the table. |
| The data types of the columns in the table. |
| The URI of the file in HDFS. |
| The type of the file. |
The URI cannot be a directory address, and the directory to which the file belongs must exist. Otherwise, an error is reported when data is written.
Create an HDFS engine table and prepare data.
Download the sample file orders.csv and upload the file to a directory of the Hadoop cluster. In this example, the sample file is uploaded to the root directory of the Hadoop cluster.
Execute the following statements to create a database named hdfs and an HDFS table named hdfs.orders:
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');NoteIn this example, the sample data is uploaded to the root directory of the Hadoop cluster. In the sample code,
192.168.**.**is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.
Execute the following statements to import data into the product.orders_all table:
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs.orders;Execute the following statement to check data consistency:
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
Use HDFS table functions to import data
HDFS table functions of ClickHouse can read file data from a specified HDFS address and return a table with a specified schema. Syntax:
hdfs(uri, format, structure);Parameter | Description |
| The URI of the file in HDFS. |
| The type of the file. |
| The data types of the fields in the table. Example: column1 UInt32 and column2 String. |
The URI cannot be a directory address, and the directory to which the file belongs must exist. Otherwise, an error is reported when data is written.
Download the sample file orders.csv and upload the file to a directory of the Hadoop cluster. In this example, the sample file is uploaded to the root directory of the Hadoop cluster.
Execute the following statements to import data into the product.orders_all table:
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');NoteIn this example, the sample data is uploaded to the root directory of the Hadoop cluster. In the sample code,
192.168.**.**is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.Execute the following statement to check data consistency:
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
Export data from a ClickHouse cluster to HDFS
Step 1: Create a business table
In this example, the business table schema used for the data export is the same as that used for the data import. For more information, see Step 1: Create a business table.
Step 2: Prepare data
Execute the following statement to insert data into the product.orders_all business table to prepare the data for subsequent export operations:
INSERT INTO product.orders_all VALUES (60333391,'2021-08-04 11:26:01',49358700,89) (38826285,'2021-08-03 10:47:29',25166907,27) (10793515,'2021-07-31 02:10:31',95584454,68) (70246093,'2021-08-01 00:00:08',82355887,97) (70149691,'2021-08-02 12:35:45',68748652,1) (87307646,'2021-08-03 19:45:23',16898681,71) (61694574,'2021-08-04 23:23:32',79494853,35) (61337789,'2021-08-02 07:10:42',23792355,55) (66879038,'2021-08-01 16:13:19',95820038,89);Optional. Specify an export method to prevent export failures that occur if the file to which you want to write data already exists. You can specify the export method for EMR clusters of V5.8.0, V3.45.0, and their later minor versions.
Incremental export
After you configure the following parameter, if the file to which you want to write data already exists in HDFS, a new file is created in the related directory to store the incremental data.
set hdfs_create_new_file_on_insert=1Overwrite export
After you configure the following parameter, if the file to which you want to write data already exists in HDFS, the data that you want to write will overwrite the existing data in the related file. Proceed with caution.
set hdfs_truncate_on_insert=1
Step 3: Export data
Use HDFS table engines to export data
Execute the following statements to create an HDFS table:
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');NoteIn this example, data is exported to the root directory of the Hadoop cluster. In the sample code,
192.168.**.**is the private IP address of the core-1-1 node in the Hadoop cluster. You can log on to the EMR console, click EMR on ECS in the left-side navigation pane, click Nodes in the Actions column of the Hadoop cluster, and then view the private IP address on the Nodes tab.Execute the following statements to export data. The data is stored in the related directory by using an HDFS table engine:
INSERT INTO hdfs.orders SELECT uid, date, skuId, order_revenue FROM product.orders_all;NoteDuring the data export, ClickHouse creates a file in the related path and writes data to the file. By default, if the file to which you want to write data already exists in HDFS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.
Execute the following statement to check data consistency:
SELECT a.* FROM hdfs.orders RIGHT ANTI JOIN product.orders_all a USING uid;
Use HDFS table functions to export data
Execute the following statements to export data:
INSERT INTO FUNCTION hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all;NoteDuring the data export, ClickHouse creates a file in the related path and writes data to the file. By default, if the file to which you want to write data already exists in HDFS, the data export fails. You can configure parameters for EMR clusters of V5.8.0, V3.45.0, and their later minor versions to prevent the failure.
Execute the following statement to check data consistency:
SELECT
a.*
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
product.orders_all a
USING uid;Configure HDFS-related parameters
You can configure HDFS-related parameters for an EMR ClickHouse cluster.
Configure the HDFS-related parameters that take effect globally.
<hdfs> <dfs_default_replica>3</dfs_default_replica> </hdfs>For more information about HDFS-related parameters, see HDFS Configuration Reference on the official website of Apache HAWQ.
NoteReplace underscores (_) with periods (.) when you search for parameters. For example, if you want to query the
dfs_default_replicaparameter in EMR, search for thedfs.default.replicaparameter in the documentation on the official website.Configure the HDFS-related parameters that take effect only for the user that is specified by ${user}. If the user-specific configuration and global configuration use the same key but different key values, the global configuration is overwritten by the user-specific configuration.
<hdfs_${user}> <dfs_default_replica>3</dfs_default_replica> </hdfs_${user}>