本文为您介绍如何使用ClickHouse连接器。

背景信息

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,详情请参见What Is ClickHouse?

ClickHouse连接器支持的信息如下.
类别详情
支持类型仅支持结果表
运行模式批模式和流模式
数据格式暂不适用
特有监控指标
  • numRecordsOut
  • numRecordsOutPerSecond
  • currentSendTime
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
是否支持更新或删除结果表数据

特色功能

  • 对于ClickHouse的分布式表,支持直接写对应的本地表。
  • 对于EMR的ClickHouse,提供Exactly Once的语义。

前提条件

  • 已创建ClickHouse表,详情请参见创建表
  • 已配置白名单。
    • 如果您使用的是阿里云数据库ClickHouse,配置白名单详情请参见设置白名单
    • 如果您使用的是阿里云E-MapReduce的ClickHouse,配置白名单详情请参见管理安全组
    • 如果您使用的是阿里云ECS上自建的ClickHouse,配置白名单详情请参见安全组概述
    • 如果为其他情况,请您自行配置ClickHouse所在机器的白名单让其可被Flink所在机器访问即可。
    说明 如何查看Flink全托管交换机的网段,请参见如何设置白名单?

使用限制

  • ClickHouse结果表保证At-Least-Once语义。
  • 仅Flink计算引擎VVR 3.0.2及以上版本支持ClickHouse连接器。
  • 仅Flink计算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete选项。
  • 仅Flink计算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested类型。
  • 仅Flink计算引擎VVR 4.0.11及以上版本支持直接将数据写入到ClickHouse分布式表对应的本地表。
  • 仅Flink计算引擎VVR 4.0.11及以上版本提供写EMR的ClickHouse的Exactly Once语义。

语法结构

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH参数

参数说明数据类型是否必填默认值备注
connector结果表类型。String固定值为clickhouse
urlClickHouse的JDBC连接地址。StringURL格式为jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>

如果不写数据库名称,则使用默认的default数据库。

说明 如果您要将数据写入ClickHouse分布式表,则URL为该分布式表所在节点的JDBC URL。
userNameClickHouse的用户名。String无。
passwordClickHouse的密码。String无。
tableNameClickHouse的表名称。String无。
maxRetryTimes向结果表插入数据失败后的最大尝试次数。Int3无。
batchSize一次批量写入的数据条数。Int100

如果缓存中的数据条数达到了batchSize参数值,或者等待时间超过flushIntervalMs后,系统将会自动将缓存中的数据写入ClickHouse表中。

flushIntervalMs清空缓存的时间间隔。Long1000单位为毫秒。
ignoreDelete是否忽略Delete消息。Booleantrue参数取值如下:
  • true(默认值):忽略。
  • false:不忽略。

    如果为false,并且在DDL中声明了Primary Key,则会使用ClickHouse的ALTER语句来删除数据。

说明 如果设置ignoreDelete=false,则无法支持以partition的方式写ClickHouse分布表的本地表,所以就不能再设置writeMode为partition。
shardWrite对于ClickHouse分布式表,是否直接写ClickHouse的本地表。Booleanfalse参数取值如下:
  • true:跳过分布式表,直接将数据写到该ClickHouse分布式表对应的本地表。
    Flink将自动查询ClickHouse的集群信息,得到对应的本地表信息并进行写入。此时tableName应该为分布式表的名字。您也可以在URL中手动指定要将数据写到哪些节点的本地表中。此时tableName应该为本地表名字。示例如下。
    'url' = 'jdbc:clickhouse://192.XX.XX.1:8123,192.XX.XX.2:8123/default'
    'tableName' = 'local_table'

    如果需要提高写ClickHouse分布式表的吞吐量,则建议将该值设置为true。

  • false(默认值):先写ClickHouse的分布式表,再由分布式表写入对应的本地表。此时tableName应为分布式表的名称。
writeMode对于ClickHouse分布式表,采用何种策略写ClickHouse的本地表。Enumdefault参数取值如下:
  • default(默认值):表示总是写入到第一个节点的本地表。
  • partition:表示将数据按key写到同一个节点的本地表。
  • random:表示随机写到某个节点的本地表。
说明 如果设置了writeMode=partition,请确保配置项ignoreDelete为true。
shardingKey按何种key将数据写到同一个节点的本地表。defaultwriteMode取值为partition时,shardingKey值必填,可包含多个字段,多个字段以英文逗号(,)分隔。
exactlyOnce是否开启exactlyOnce语义。
说明
  • 目前仅支持写EMR的ClickHouse的Exactly Once语义。所以只有当您写EMR的ClickHouse时,才能将exactlyOnce设置为true。
  • 不支持以partition策略写ClickHouse的本地表的Exactly Once语义。所以如果exactlyOnce设置为true,则writeMode不能设置为partition。
Booleanfalse参数取值如下:
  • true:开启。
  • false(默认值):不开启。

类型映射

Flink字段类型ClickHouse字段类型
BOOLEANUInt8 / Boolean
说明 ClickHouse v21.12及以上版本支持Boolean类型。如果您使用的ClickHouse是v21.12以下版本,Flink的Boolean类型则对应ClickHouse的UInt8类型。
TINYINTInt8
SMALLINTInt16
INTEGERInt32
BIGINTInt64
BIGINTUInt32
FLOATFloat32
DOUBLEFloat64
CHARFixedString
VARCHARString
BINARYFixedString
VARBINARYString
DATEDate
TIMESTAMP(0)DateTime
TIMESTAMP(x)Datetime64(x)
DECIMALDECIMAL
ARRAYARRAY
Nested
说明 ClickHouse暂不支持Flink的TIME、MAP、MULTISET和ROW类型。
对于ClickHouse的Nested类型,需要将其映射成Flink的ARRAY类型,例如:
// ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);
需要映射为:
// Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
说明 ClickHouse的DateTime类型可以精确到秒,Datetime64可以精确到纳秒。因为ClickHouse官方提供的JDBC写Datetime64数据类型会出现精度丢失,只能精确到秒的问题,所以目前通过Flink只能写入秒级别的TIMESTAMP,即TIMESTAMP(0)。

使用示例

  • 示例1:写ClickHouse单节点表。
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 示例2:写ClickHouse分布式表。
    假设您已经有三个本地表,表名为local_table_test,分别在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3节点上。然后基于这三个本地表,创建了一个分布式表distributed_table_test。此时,如果您希望Flink可以直接写本地表,并且可以按照某个key将相同key的数据写到同一个节点的本地表中,则DDL代码示例如下。
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = 'jdbc:clickhouse://192.XX.XX.1:8123,192.XX.XX.2:8123,192.XX.XX.3:8123/default',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = 'local_table_test',
      'shardWrite' = 'true',
      'writeMode' = 'partition',
      'shardingKey' = 'name'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;

常见问题

ClickHouse结果表是否支持回撤更新数据?