本文为您介绍ClickHouse结果表的DDL定义,以及创建结果表时使用的WITH参数、类型映射和代码示例。
什么是ClickHouse
ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统,详情请参见什么是ClickHouse。
前提条件
使用限制
- 仅Flink计算引擎VVR 3.0.2及以上版本支持ClickHouse Connector。
- 仅Flink计算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete选项。
- 仅Flink计算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested类型。
- 仅Flink计算引擎VVR 4.0.11及以上版本支持直接将数据写入到ClickHouse分布式表对应的本地表。
- 仅Flink计算引擎VVR 4.0.11及以上版本提供写EMR的ClickHouse的Exactly Once语义。
DDL定义
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH参数
参数 | 说明 | 是否必选 | 备注 |
---|---|---|---|
connector | 结果表类型。 | 是 | 固定值为clickhouse。 |
url | ClickHouse的JDBC连接地址。 | 是 | URL格式为jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName> 如果不写数据库名称,则使用默认的default数据库。
说明 如果您要将数据写入ClickHouse分布式表,则URL为该分布式表所在节点的JDBC URL。
|
userName | ClickHouse的用户名。 | 是 | 无。 |
password | ClickHouse的密码。 | 是 | 无。 |
tableName | ClickHouse的表名称。 | 是 | 无。 |
maxRetryTimes | 向结果表插入数据失败后的最大尝试次数。 | 否 | 默认值为3。 |
batchSize | 一次批量写入的数据条数。 | 否 | 默认值为100。 如果缓存中的数据条数达到了batchSize参数值,或者等待时间超过flushIntervalMs后,系统将会自动将缓存中的数据写入ClickHouse表中。 |
flushIntervalMs | 清空缓存的时间间隔。 | 否 | 默认值为1000,单位为毫秒。 |
ignoreDelete | 是否忽略Delete消息。 | 否 | 参数取值如下:
说明 如果设置ignoreDelete=false的话,就无法支持直接写ClickHouse分布表的本地表,所以就不能再设置writeMode和shardingKey这两个配置项。
|
shardWrite | 对于ClickHouse分布式表,是否直接写ClickHouse的本地表。 | 否 | 参数取值如下:
|
writeMode | 对于ClickHouse分布式表,采用何种策略写ClickHouse的本地表。 | 否 | 参数取值如下:
|
shardingKey | 按何种key将数据写到同一个节点的本地表。 | 否 | 当writeMode取值为partition时,shardingKey值必填,可包含多个字段,多个字段以英文逗号(,)分隔。 |
exactlyOnce | 是否开启exactlyOnce语义。
说明
|
否 | 参数取值如下:
|
类型映射
ClickHouse字段类型 | Flink字段类型 |
---|---|
UInt8 | BOOLEAN
说明 目前,Flink的BOOLEAN类型对应ClickHouse的UInt8类型,暂不支持对应ClickHouse的BOOLEAN类型。
|
Int8 | TINYINT |
Int16 | SMALLINT |
Int32 | INTEGER |
UInt32 | BIGINT |
Int64 | BIGINT |
Float32 | FLOAT |
Float64 | DOUBLE |
FixedString | CHAR |
String | VARCHAR |
FixedString | BINARY |
String | VARBINARY |
Date | DATE |
DateTime | TIMESTAMP(0) |
Datetime64(x) | TIMESTAMP(x) |
Decimal | DECIMAL |
ARRAY | ARRAY |
Nested |
说明 ClickHouse暂不支持Flink的TIME、MAP、MULTISET和ROW类型。
对于ClickHouse的Nested类型,需要将其映射成Flink的ARRAY类型,例如:
// ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
需要映射为:
// Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<INTEGER>,
`Goals.OrderID` ARRAY<STRING>
);
重要 ClickHouse的DateTime类型可以精确到秒,Datetime64可以精确到纳秒。因为ClickHouse官方提供的JDBC写Datetime64数据类型会出现精度丢失,只能精确到秒的问题。所以目前通过Flink只能写入秒级别的TIMESTAMP,即TIMESTAMP(0)。
代码示例
- 示例1:写ClickHouse单节点表。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
- 示例二:写ClickHouse分布式表
假设您已经有三个本地表,表名为local_table_test,分别在192.168.1.1、192.168.1.2和192.168.1.3节点上。然后基于这三个本地表,创建了一个分布式表distributed_table_test。此时,如果您希望Flink可以直接写本地表,并且可以按照某个key将相同key的数据写到同一个节点的本地表中,则DDL代码示例如下。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;