This topic provides the DDL syntax that is used to create a ClickHouse result table, describes the parameters in the WITH clause, and provides data type mappings and examples.

What is ClickHouse?

ClickHouse is a column-oriented database management system that is used for Online Analytical Processing (OLAP). For more information, see What is ClickHouse?

Prerequisites

  • A ClickHouse table is created. For more information, see Create a table.
  • A whitelist is configured for a ClickHouse cluster.
    • If you use an Alibaba Cloud ApsaraDB for ClickHouse cluster, configure a whitelist by following the instructions provided in Configure a whitelist.
    • If you use a ClickHouse cluster that is deployed in Alibaba Cloud E-MapReduce (EMR), configure a whitelist by following the instructions provided in Manage security groups.
    • If you use a self-managed ClickHouse cluster that is hosted on Elastic Compute Service (ECS) instances, configure a whitelist by following the instructions provided in Overview.
    • In other cases, configure the whitelist of the machines where the ClickHouse cluster is deployed to ensure that the ClickHouse cluster can be accessed by the machine where Flink is deployed.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 3.0.2 or later supports ClickHouse connectors.
  • Only Flink that uses VVR 3.0.3 or VVR 4.0.7, or their later minor versions supports the ignoreDelete parameter in the WITH clause.
  • Only Flink that uses VVR 4.0.10 or later supports the NESTED data type of ClickHouse.
  • Only Flink that uses VVR 4.0.11 or later allows you to write data to a ClickHouse local table that corresponds to a ClickHouse distributed table.
  • Only Flink that uses VVR 4.0.11 or later provides the exactly-once semantics in a ClickHouse cluster that is deployed in Alibaba Cloud EMR.

DDL syntax

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shadWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to clickhouse.
url The Java Database Connectivity (JDBC) URL of ClickHouse. Yes The URL format is jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>.

If you do not specify the database name, the database named default is used.

Note If you want to write data to a ClickHouse distributed table, the URL is the JDBC URL of the node to which the distributed table belongs.
userName The username that is used to access ClickHouse. Yes N/A.
password The password that is used to access ClickHouse. Yes N/A.
tableName The name of the ClickHouse table. Yes N/A.
maxRetryTimes The maximum number of retries for writing data to the result table. No Default value: 3.
batchSize The number of data records that can be written at a time. No Default value: 100.

If the number of data records in the cache reaches the value of the batchSize parameter or the interval at which the cache is cleared is greater than the value of the flushIntervalMs parameter, the system automatically writes the cached data to the ClickHouse table.

flushIntervalMs The interval at which the cache is cleared. No Default value: 1000. Unit: milliseconds.
ignoreDelete Specifies whether to ignore the delete messages. No Valid values:
  • true: The delete messages are ignored. This is the default value.
  • false: The delete messages are not ignored.

    If you set this parameter to false and define a primary key in the DDL statement, the system executes the ALTER statement to delete data from the ClickHouse table.

shardWrite Specifies whether to directly write data to a ClickHouse local table if the current table is a ClickHouse distributed table. No Valid values:
  • true: The system skips the ClickHouse distributed table and directly writes data to a ClickHouse local table that corresponds to the ClickHouse distributed table.
    Flink automatically queries the ClickHouse cluster information to obtain the information of the ClickHouse local table and write the obtained information to the table. In this case, the value of tableName is the name of the ClickHouse distributed table. You can also manually specify the nodes in which data is written to the ClickHouse local tables in the url parameter. In this case, the value of tableName is the name of the ClickHouse local tables. Example:
    'url' = 'jdbc:clickhouse://192.168.1.1:8123,192.168.1.2:8123/default'
    'tableName' = 'local_table'

    If you want to increase the throughput for writing data to the ClickHouse distributed table, we recommend that you set this parameter to true.

  • false: The system writes data to the ClickHouse distributed table and then to a ClickHouse local table that corresponds to the ClickHouse distributed table. This is the default value. In this case, the value of tableName is the name of the ClickHouse distributed table.
writeMode The policy based on which data is written to a ClickHouse local table. No Valid values:
  • default: Data is written to the ClickHouse local table on the first node of the ClickHouse cluster. This is the default value.
  • partition: Data with the same key is written to the same ClickHouse local table on a specific node.
  • random: Data is randomly written to the ClickHouse local table on a node.
shardingKey The key based on which data is written to the same ClickHouse local table on a specific node. No If you set writeMode to partition, you must configure the shardingKey parameter. The value of shardingKey can contain multiple fields. Separate multiple fields with commas (,).
exactlyOnce Specifies whether to use the exactly-once semantics.
Note
  • You can use the exactly-once semantics to write data only to a ClickHouse cluster that is deployed in Alibaba Cloud EMR. Therefore, you can set this parameter to true only if you want to write data to a ClickHouse cluster that is deployed in Alibaba Cloud EMR.
  • If you set writeMode to partition and you want to write data to a ClickHouse local table, you cannot use the exactly-once semantics. Therefore, if you set the exactlyOnce parameter to true, you cannot set the writeMode parameter to partition.
No Valid values:
  • true: The exactly-once semantics is used.
  • false: The exactly-once semantics is not used. This is the default value.

Data type mappings

Data type of ClickHouse Data type of Flink
UInt8 BOOLEN
Int8 TINYINT
Int16 SMALLINT
Int32 INTEGER
Int64 BIGINT
Float32 FLOAT
Float64 DOUBLE
FixedString CHAR
String VARCHAR
FixedString BINARY
String VARBINARY
Date DATE
DateTime TIMESTAMP(0)
Datetime64(x) TIMESTAMP(x)
Decimal DECIMAL
ARRAY ARRAY
Nested
Note ClickHouse does not support the following data types of Flink: TIME, MAP, MULTISET, and ROW.
To use the NESTED data type of ClickHouse, you must map this data type to ARRAY. Sample code:
// ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);
Map the NESTED data type of ClickHouse to the ARRAY data type of Flink.
// Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<INTEGER>,
  `Goals.OrderID` ARRAY<STRING>
);
Notice The DateTime type of ClickHouse can be accurate to the second, and the Datetime64 type can be accurate to the nanosecond. When the ClickHouse JDBC driver reads data of the Datetime64 type, a precision loss occurs and the data can be accurate only to the second. Therefore, Flink can write data of the TIMESTAMP type only in seconds, which is represented as TIMESTAMP(0).

Examples

  • Example 1: Data is written to a ClickHouse local table on a node.
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • Example 2: Data is written to the ClickHouse distributed table.
    Three ClickHouse local tables named local_table_test exist on the 192.168.1.1, 192.168.1.2, and 192.168.1.3 nodes. A ClickHouse distributed table named distributed_table_test is created based on the ClickHouse local tables. In this case, if you want to directly write data with the same key to the same ClickHouse local table on a specific node, execute the following statements:
    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = 'jdbc:clickhouse://192.168.1.1:8123,192.168.1.2:8123,192.168.1.3:8123/default',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = 'local_table_test',
      'shardWrite' = 'true',
      'writeMode' = 'partition',
      'shardingKey' = 'name'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;

FAQ

Can I retract the updated data from an ApsaraDB for ClickHouse result table?