This topic describes how to import data from a Message Queue for Apache Kafka instance to an ApsaraDB for ClickHouse cluster.

Prerequisites

The Message Queue for Apache Kafka instance and ApsaraDB for ClickHouse cluster must be in the same VPC to ensure successful data import.

Procedure

  1. Create a Kafka consumption table in the ApsaraDB for ClickHouse cluster.
    CREATE TABLE default.kafka_src_table ON CLUSTER default
    (// Define table schema fields.
        id Int32,
        age Int32,
        msg String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = '*',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'JSONEachRow';
    Parameters in the statement:
    • kafka_broker_list: the endpoint of the Message Queue for Apache Kafka instance. Log on to the Message Queue for Apache Kafka console. Find the instance and view its details. On the Instance Details tab, find the Default Endpoint in the Basic Information section. Copy the default endpoint.
    • kafka_topic_list: the topic of the Message Queue for Apache Kafka instance.
    • kafka_group_name: the consumer group for the topic. You must first create a consumer group in the Message Queue for Apache Kafka console. For more information, see Step 2: Create a consumer group.
    • kafka_format: the data type that ApsaraDB for ClickHouse can process.
      • JSONEachRow indicates that each row is a data entry in the JSON format. JSONEachRow can be used for general JSON format.
      • For the nested JSON format, specify input_format_import_nested_json=1.
      For more information about the formats supported in ClickHouse, visit Formats for Input and Output Data.

    For more information about attribute settings, see Kafka.

  2. Create a destination ApsaraDB for ClickHouse table.
    1. Create a local table.
      create table default.kafka_table_local ON CLUSTER default (
        id Int32,
        age UInt32,
        msg String
      ) ENGINE = ReplicatedMergeTree(
          '/clickhouse/tables/{database}/{table}/{shard}',
          '{replica}')
          ORDER BY (id);
    2. Create a distributed table.
      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
      ENGINE = Distributed(default, default, kafka_table_local, id);
  3. Create a view to import the data consumed by the Kafka consumption table to the destination ApsaraDB for ClickHouse table.
    CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
    Note The Kafka consumption table cannot be directly used as a result table. The Kafka consumption table is only used to consume data of the Message Queue for Apache Kafka instance but does not store all data of the Message Queue for Apache Kafka instance.