This topic provides the DDL syntax that is used to create a ClickHouse result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

What is ClickHouse?

ClickHouse is a column-oriented database management system used for online analytical processing (OLAP). For more information, see What Is ClickHouse?

Prerequisites

  • A ClickHouse table is created. For more information, see Create a table.
  • A whitelist is configured for a ClickHouse cluster.
    • If you use an Alibaba Cloud ApsaraDB for ClickHouse cluster, configure a whitelist by following the instructions provided in Configure a whitelist.
    • If you use a ClickHouse cluster that is deployed in Alibaba Cloud E-MapReduce (EMR), configure a whitelist by following the instructions provided in Security groups.
    • If you use a self-managed ClickHouse cluster that is hosted on Elastic Compute Service (ECS) instances, configure a whitelist by following the instructions provided in Overview.
    • In other cases, configure the whitelist of the machines where the ClickHouse cluster is deployed to make sure that the ClickHouse cluster can be accessed by the machine where Flink is deployed.

Limits

Only Flink that uses Ververica Runtime (VVR) 3.0.2 or later supports ClickHouse connectors.

DDL syntax

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
)

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to clickhouse.
url The Java Database Connectivity (JDBC) URL of ClickHouse. Yes The URL format is jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>.

If you do not specify a database name, the database named default is used.

userName The username that is used to access ClickHouse. Yes N/A.
password The password that is used to access ClickHouse. Yes N/A.
tableName The name of the ClickHouse table. Yes N/A.
maxRetryTimes The maximum number of retries for writing data to the result table. No Default value: 3.
batchSize The number of data records that are written at the same time. No Default value: 100.
flushIntervalMs The interval at which the cache is cleared. No Default value: 1000. Unit: milliseconds. This value indicates that if the number of input data records does not reach the value specified by the bufferSize parameter within 1 second, all cached data is written to the ClickHouse result table.

Data type mapping

Data type of ClickHouse Data type of Flink
UInt8 BOOLEN
Int8 TINYINT
Int16 SMALLINT
Int32 INTEGER
Int64 BIGINT
Float32 FLOAT
Float64 DOUBLE
FixedString CHAR
String VARCHA
FixedString BINARY
String VARBINARY
Date DATE
DateTime TIMESTAMP(0)
Datetime64(x) TIMESTAMP(x)
Decimal DECIMAL
Array ARRAY
Not supported TIME
Not supported MAP
Not supported MULTISET
Not supported ROW
Notice The DateTime type of ClickHouse can be accurate to the second, and the Datetime64 type can be accurate to the nanosecond. When the ClickHouse Java Database Connectivity (JDBC) driver reads data of the Datetime64 type, a precision loss occurs and the data can only be accurate to the second. Therefore, Flink can write data of the TIMESTAMP type only in seconds. The data is represented as TIMESTAMP(0).

Sample code

CREATE TEMPORARY TABLE clickhouse_source (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
);

INSERT INTO clickhouse_output
SELECT 
  id,
  name,
  age,
  rate
FROM clickhouse_source;