You can use Kafka table engines to import data to an E-MapReduce (EMR) ClickHouse
cluster. This topic describes how to import data from an EMR Kafka cluster to an EMR
ClickHouse cluster.
Limits
The DataFlow cluster and ClickHouse cluster must be deployed in the same virtual private
cloud (VPC).
Syntax
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host1:port1,host2:port2',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format';
The following table describes the parameters in the syntax.
Parameter |
Description |
db |
The name of the database. |
table_name |
The name of the table in the database. |
cluster |
The identifier of the cluster. |
name1/name2 |
The names of the columns in the table. |
tyep1/type2 |
The data types of the columns in the table. |
kafka_broker_list |
The IP addresses and port numbers of Kafka brokers.
You can view the internal IP addresses and port numbers of all Kafka brokers in the
EMR Kafka cluster from the Nodes on the Clusters page in the EMR console.
|
kafka_topic_list |
The names of the topics to which the Kafka consumer group subscribes. |
kafka_group_name |
The name of the Kafka consumer group. |
kafka_format |
The data format, such as CSV or JSONEachRow. For more information, see Formats for Input and Output Data.
|
Example
- Perform the following substeps in the EMR ClickHouse cluster:
- Log on to the EMR ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.
- Run the following command to start the ClickHouse client:
clickhouse-client -h core-1-1 -m
Note In the sample command, core-1-1 indicates the name of the core node that you log on
to. If you have multiple core nodes, you can log on to one of the nodes.
- Execute the following statement to create a database named kafka:
CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
Note You can specify a custom name for the database. In this example, the default cluster
identifier cluster_emr
is used. If you have changed the cluster identifier, enter the valid cluster identifier.
You can also search for the clickhouse_remote_servers parameter on the Configure tab of the ClickHouse page in the EMR console to query the cluster identifier.
- Execute the following statement to create a table in the kafka database:
CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
kafka_topic_list = 'clickhouse_test',
kafka_group_name = 'clickhouse_test',
kafka_format = 'CSV';
kafka_broker_list
is the internal IP addresses and ports of all nodes in the DataFlow cluster. You
can view the internal IP addresses and ports on the Nodes page in the EMR console. For more information about other parameters, see the Syntax section in this topic.
- Execute the following statement to create a database named product:
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
- Execute the following statement to create a local table:
CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
PARTITION BY toYYYYMMDD(date)
ORDER BY toYYYYMMDD(date);
- Execute the following statement to create a distributed table:
CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
(
`uid` UInt32,
`date` DateTime,
`skuId` UInt32,
`order_revenue` UInt32
)
Engine = Distributed(cluster_emr, product, orders, rand());
- Execute the following statement to create a materialized view to automatically import
data:
CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
SELECT *
FROM kafka.consumer;
- Perform the following substeps in the DataFlow cluster:
- Log on to the DataFlow cluster in SSH mode. For more information, see Log on to a cluster.
- In the CLI of the DataFlow cluster, run the following command to start the Kafka producer:
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
- Run the following commands to generate the test data.
38826285,2021-08-03 10:47:29,25166907,27
10793515,2021-07-31 02:10:31,95584454,68
70246093,2021-08-01 00:00:08,82355887,97
70149691,2021-08-02 12:35:45,68748652,1
87307646,2021-08-03 19:45:23,16898681,71
61694574,2021-08-04 23:23:32,79494853,35
61337789,2021-08-02 07:10:42,23792355,55
66879038,2021-08-01 16:13:19,95820038,89
- In the CLI of the EMR ClickHouse cluster, execute the following statement to query
the data that is imported from the EMR Kafka cluster to the EMR ClickHouse cluster.
You can check whether the queried data is the same as the source data.
SELECT * FROM product.orders_all;