本文为您介绍ClickHouse结果表的DDL定义,以及创建结果表时使用的WITH参数、类型映射和代码示例。

什么是ClickHouse

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,详情请参见什么是ClickHouse

前提条件

  • 已创建ClickHouse表,详情请参见创建表
  • 已配置白名单。
    • 如果您使用的是阿里云数据库ClickHouse,配置白名单详情请参见设置白名单
    • 如果您使用的是阿里云E-MapReduce的ClickHouse,配置白名单详情请参见管理安全组
    • 如果您使用的是阿里云ECS上自建的ClickHouse,配置白名单详情请参见安全组概述
    • 如果为其他情况,请您自行配置ClickHouse所在机器的白名单让其可被Flink所在机器访问即可。

使用限制

  • 仅Flink计算引擎VVR 3.0.2及以上版本支持ClickHouse Connector。
  • 仅Flink计算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete选项。
  • 仅Flink计算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested类型。
  • 仅Flink计算引擎VVR 4.0.11及以上版本支持直接将数据写入到ClickHouse分布式表对应的本地表。
  • 仅Flink计算引擎VVR 4.0.11及以上版本提供写EMR的ClickHouse的Exactly Once语义。

DDL定义

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH参数

参数 说明 是否必选 备注
connector 结果表类型。 固定值为clickhouse
url ClickHouse的JDBC连接地址。 URL格式为jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>

如果不写数据库名称,则使用默认的default数据库。

说明 如果您要将数据写入ClickHouse分布式表,则URL为该分布式表所在节点的JDBC URL。
userName ClickHouse的用户名。 无。
password ClickHouse的密码。 无。
tableName ClickHouse的表名称。 无。
maxRetryTimes 向结果表插入数据失败后的最大尝试次数。 默认值为3。
batchSize 一次批量写入的数据条数。 默认值为100。

如果缓存中的数据条数达到了batchSize参数值,或者等待时间超过flushIntervalMs后,系统将会自动将缓存中的数据写入ClickHouse表中。

flushIntervalMs 清空缓存的时间间隔。 默认值为1000,单位为毫秒。
ignoreDelete 是否忽略Delete消息。 参数取值如下:
  • true(默认值):忽略。
  • false:不忽略。

    如果为false,并且在DDL中声明了Primary Key,则会使用ClickHouse的ALTER语句来删除数据。

说明 如果设置ignoreDelete=false的话,就无法支持直接写ClickHouse分布表的本地表,所以就不能再设置writeMode和shardingKey这两个配置项。
shardWrite 对于ClickHouse分布式表,是否直接写ClickHouse的本地表。 参数取值如下:
  • true:跳过分布式表,直接将数据写到该ClickHouse分布式表对应的本地表。
    Flink将自动查询ClickHouse的集群信息,得到对应的本地表信息并进行写入。此时 tableName应该为分布式表的名字。您也可以在URL中手动指定要将数据写到哪些节点的本地表中。此时 tableName应该为本地表名字。示例如下。
    'url' = 'jdbc:clickhouse://192.168.1.1:8123,192.168.1.2:8123/default'
    'tableName' = 'local_table'

    如果需要提高写ClickHouse分布式表的吞吐量,则建议将该值设置为true。

  • false(默认值):先写ClickHouse的分布式表,再由分布式表写入对应的本地表。此时tableName应为分布式表的名称。
writeMode 对于ClickHouse分布式表,采用何种策略写ClickHouse的本地表。 参数取值如下:
  • default(默认值):表示总是写入到第一个节点的本地表。
  • partition:表示将数据按key写到同一个节点的本地表。
  • random:表示随机写到某个节点的本地表。
shardingKey 按何种key将数据写到同一个节点的本地表。 writeMode取值为partition时,shardingKey值必填,可包含多个字段,多个字段以英文逗号(,)分隔。
exactlyOnce 是否开启exactlyOnce语义。
说明
  • 目前仅支持写EMR的ClickHouse的Exactly Once语义。所以只有当您写EMR的ClickHouse时,才能将exactlyOnce设置为true。
  • 不支持以partition策略写ClickHouse的本地表的Exactly Once语义。所以如果exactlyOnce设置为true,则writeMode不能设置为partition。
参数取值如下:
  • true:开启。
  • false(默认值):不开启。

类型映射

ClickHouse字段类型 Flink字段类型
UInt8 BOOLEAN
说明 目前,Flink的BOOLEAN类型对应ClickHouse的UInt8类型,暂不支持对应ClickHouse的BOOLEAN类型。
Int8 TINYINT
Int16 SMALLINT
Int32 INTEGER
UInt32 BIGINT
Int64 BIGINT
Float32 FLOAT
Float64 DOUBLE
FixedString CHAR
String VARCHAR
FixedString BINARY
String VARBINARY
Date DATE
DateTime TIMESTAMP(0)
Datetime64(x) TIMESTAMP(x)
Decimal DECIMAL
ARRAY ARRAY
Nested
说明 ClickHouse暂不支持Flink的TIME、MAP、MULTISET和ROW类型。
对于ClickHouse的Nested类型,需要将其映射成Flink的ARRAY类型,例如:
// ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);
需要映射为:
// Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<INTEGER>,
  `Goals.OrderID` ARRAY<STRING>
);
重要 ClickHouse的DateTime类型可以精确到秒,Datetime64可以精确到纳秒。因为ClickHouse官方提供的JDBC写Datetime64数据类型会出现精度丢失,只能精确到秒的问题。所以目前通过Flink只能写入秒级别的TIMESTAMP,即TIMESTAMP(0)。

代码示例

  • 示例1:写ClickHouse单节点表。
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 示例二:写ClickHouse分布式表
    假设您已经有三个本地表,表名为local_table_test,分别在192.168.1.1、192.168.1.2和192.168.1.3节点上。然后基于这三个本地表,创建了一个分布式表distributed_table_test。此时,如果您希望Flink可以直接写本地表,并且可以按照某个key将相同key的数据写到同一个节点的本地表中,则DDL代码示例如下。
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = 'jdbc:clickhouse://192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123/default',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = 'local_table_test',
      'shardWrite' = 'true',
      'writeMode' = 'partition',
      'shardingKey' = 'name'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;

常见问题

ClickHouse结果表是否支持回撤更新数据?