You can use Kafka table engines to import data to an E-MapReduce (EMR) ClickHouse cluster. This topic describes how to import data from an EMR Kafka cluster to an EMR ClickHouse cluster.

Prerequisites

Limits

The DataFlow cluster and ClickHouse cluster must be deployed in the same virtual private cloud (VPC).

Syntax

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host1:port1,host2:port2',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format';
The following table describes the parameters in the syntax.
Parameter Description
db The name of the database.
table_name The name of the table in the database.
cluster The identifier of the cluster.
name1/name2 The names of the columns in the table.
tyep1/type2 The data types of the columns in the table.
kafka_broker_list The IP addresses and port numbers of Kafka brokers.

You can view the internal IP addresses and port numbers of all Kafka brokers in the EMR Kafka cluster from the Nodes on the Clusters page in the EMR console.

kafka_topic_list The names of the topics to which the Kafka consumer group subscribes.
kafka_group_name The name of the Kafka consumer group.
kafka_format The data format, such as CSV or JSONEachRow. For more information, see Formats for Input and Output Data.

Example

  1. Perform the following substeps in the EMR ClickHouse cluster:
    1. Log on to the EMR ClickHouse cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to start the ClickHouse client:
      clickhouse-client -h core-1-1 -m
      Note In the sample command, core-1-1 indicates the name of the core node that you log on to. If you have multiple core nodes, you can log on to one of the nodes.
    3. Execute the following statement to create a database named kafka:
      CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
      Note You can specify a custom name for the database. In this example, the default cluster identifier cluster_emr is used. If you have changed the cluster identifier, enter the valid cluster identifier. You can also search for the clickhouse_remote_servers parameter on the Configure tab of the ClickHouse page in the EMR console to query the cluster identifier.
    4. Execute the following statement to create a table in the kafka database:
      CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = Kafka()
      SETTINGS
        kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
        kafka_topic_list = 'clickhouse_test',
        kafka_group_name = 'clickhouse_test',
        kafka_format = 'CSV';

      kafka_broker_list is the internal IP addresses and ports of all nodes in the DataFlow cluster. You can view the internal IP addresses and ports on the Nodes page in the EMR console. For more information about other parameters, see the Syntax section in this topic.

    5. Execute the following statement to create a database named product:
      CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    6. Execute the following statement to create a local table:
      CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
      PARTITION BY toYYYYMMDD(date)
      ORDER BY toYYYYMMDD(date);
    7. Execute the following statement to create a distributed table:
      CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = Distributed(cluster_emr, product, orders, rand());
    8. Execute the following statement to create a materialized view to automatically import data:
      CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
      SELECT *
      FROM kafka.consumer;
  2. Perform the following substeps in the DataFlow cluster:
    1. Log on to the DataFlow cluster in SSH mode. For more information, see Log on to a cluster.
    2. In the CLI of the DataFlow cluster, run the following command to start the Kafka producer:
      /usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
    3. Run the following commands to generate the test data.
      38826285,2021-08-03 10:47:29,25166907,27
      10793515,2021-07-31 02:10:31,95584454,68
      70246093,2021-08-01 00:00:08,82355887,97
      70149691,2021-08-02 12:35:45,68748652,1
      87307646,2021-08-03 19:45:23,16898681,71
      61694574,2021-08-04 23:23:32,79494853,35
      61337789,2021-08-02 07:10:42,23792355,55
      66879038,2021-08-01 16:13:19,95820038,89
  3. In the CLI of the EMR ClickHouse cluster, execute the following statement to query the data that is imported from the EMR Kafka cluster to the EMR ClickHouse cluster.

    You can check whether the queried data is the same as the source data.

    SELECT * FROM product.orders_all;
    Result_click