本文介绍如何从消息队列Kafka版中同步数据到云数据库ClickHouse

前提条件

数据源Kafka集群和目的ClickHouse集群必须在同一个VPC下。

操作步骤

  1. 在ClickHouse集群中新建Kafka消费表。
    CREATE TABLE default.kafka_src_table ON CLUSTER default
    (   //定义表结构的字段
        id Int32,
        age Int32,
        msg String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = '1.1.1.1:3033,2.2.2.2:3033',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'JSONEachRow',
        kafka_num_consumers = 1,
        kafka_max_block_size = 65536,
        kafka_skip_broken_messages = 0,
        kafka_auto_offset_reset = 'latest';
    常用参数说明如下。
    名称 是否必选 说明
    kafka_broker_list 以逗号分隔的Kafka的接入点地址列表。如何查看接入点,请参见查看接入点
    kafka_topic_list 以逗号分隔的Topic名称列表。
    kafka_group_name Kafka的消费组名称。更多信息,请参见创建Group
    kafka_format ClickHouse支持处理的消息体格式。例如JSONEachRow表示每行一条数据的JSON格式。
    说明 关于ClickHouse支持的各种格式,请参见 Formats for Input and Output Data
    kafka_row_delimiter 行分隔符,用于分割不同的数据行。默认为“\n”,您也可以根据数据写入的实际分割格式进行设置。
    kafka_num_consumers 单个表的消费者数量,默认值为1。
    说明
    • 一个消费者的吞吐量不足时,需要指定更多的消费者。
    • 消费者的总数不应超过Topic中的分区数,因为每个分区只能分配一个消费者。
    kafka_max_block_size Kafka消息的最大批次大小,单位:Byte,默认值为65536 Byte。
    kafka_skip_broken_messages kafka消息解析器对于脏数据的容忍度,默认值为0。如果kafka_skip_breaked_messages=N,则引擎将跳过N条无法解析的Kafka消息(一条消息等于一行数据)。
    kafka_commit_every_batch 执行Kafka commit的频率,取值如下。
    • 0:完全写入一整个Block数据块的数据后才执行commit。
    • 1:每写完一个Batch批次的数据就执行一次commit。
    kafka_ auto_offset_reset 从哪个offset开始读取Kafka数据。取值范围:earlist,latest。
    说明 更多参数说明,请参见 Kafka
  2. 创建ClickHouse目的表。
    1. 创建本地表。
      create table default.kafka_table_local ON CLUSTER default (
        id Int32,
        age UInt32,
        msg String
      ) ENGINE = ReplicatedMergeTree(
          '/clickhouse/tables/{database}/{table}/{shard}',
          '{replica}')
          ORDER BY (id);
      说明 Kafka数据同步不限制目的表的引擎类型。
    2. 创建分布式表。
      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
      ENGINE = Distributed(default, default, kafka_table_local, id);
  3. 创建VIEW把Kafka消费表消费到的数据同步到ClickHouse目的表。
    CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
    说明
    • Kafka消费表不能直接作为结果表使用。
    • Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。