All Products
Search
Document Center

Realtime Compute for Apache Flink:ClickHouse

Last Updated:Mar 26, 2026

The ClickHouse connector lets you write Flink SQL data to ClickHouse result tables in both batch and streaming modes. It supports writing to standard tables, distributed tables, and local tables directly.

Prerequisites

Before you begin, ensure that you have:

Connector capabilities

Item Description
Table type Result table
Running mode Batch mode and streaming mode
Data format N/A
Metrics numRecordsOut, numRecordsOutPerSecond, currentSendTime. For details, see Metrics.
API type SQL API
Data updates and deletes Supported when a primary key is defined in the DDL statement and ignoreDelete is set to false. Enabling this significantly reduces write throughput.

Limitations

  • The sink.parallelism parameter is not supported.

  • Result tables support at-least-once semantics by default.

  • Requires Ververica Runtime (VVR) 3.0.2 or later.

  • The ignoreDelete parameter requires VVR 3.0.3, VVR 4.0.7, or a later minor version of either.

  • The NESTED data type requires VVR 4.0.10 or later.

  • Writing directly to a ClickHouse local table that corresponds to a distributed table requires VVR 4.0.11 or later. Only ApsaraDB for ClickHouse Community-compatible Edition clusters support this.

  • Exactly-once semantics for EMR ClickHouse clusters requires VVR 4.0.11 or later. This is no longer available for EMR V3.45.1 clusters or minor versions later than EMR V5.11.1, due to a capability change in EMR ClickHouse.

  • The balance write mode requires VVR 8.0.7 or later.

Syntax

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000',
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

Parameters in the WITH clause

Required parameters

Parameter Data type Description
connector STRING The connector type. Set to clickhouse.
url STRING The JDBC URL of ClickHouse, in the format jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>. If you omit the database name, the default database is used. To write to a distributed table, set this to the JDBC URL of the node hosting the distributed table. To write directly to local tables with manually specified nodes, list multiple node addresses separated by commas (see the examples below).
userName STRING The ClickHouse username.
password STRING The ClickHouse password.
tableName STRING The name of the ClickHouse table. When writing directly to local tables (shardWrite=true), set this to the local table name unless inferLocalTable=true, in which case set it to the distributed table name.

Optional parameters

Parameter Data type Default Description
maxRetryTimes INT 3 Maximum number of retries when a write fails.
batchSize INT 100 Number of records buffered before a flush. A flush triggers when the buffer reaches batchSize records or when flushIntervalMs elapses, whichever comes first.
flushIntervalMs LONG 1000 Maximum interval between flushes, in milliseconds.
ignoreDelete BOOLEAN true Whether to drop DELETE messages. When set to false and a primary key is defined, the connector runs an ALTER statement to delete matching rows. Setting this to false significantly reduces throughput and is incompatible with writeMode=partition.
shardWrite BOOLEAN false Whether to bypass the distributed table and write directly to the underlying local tables. Set to true to increase write throughput for distributed tables. When false, set tableName to the distributed table name.
inferLocalTable BOOLEAN false When shardWrite=true, whether to let Flink automatically discover the local table nodes from the distributed table metadata. Requires shardWrite=true, tableName set to the distributed table name, and url pointing to a node hosting the distributed table. Not needed for non-distributed tables.
writeMode ENUM default The write distribution strategy for local tables. See Choose a write mode for guidance. Valid values: default, partition, random, balance.
shardingKey STRING None The field (or comma-separated fields) used to route records when writeMode=partition. Records with the same key value go to the same local table node.
exactlyOnce BOOLEAN false Whether to use exactly-once semantics. Only supported for ClickHouse clusters deployed in Alibaba Cloud EMR. Incompatible with writeMode=partition.

Choose a write mode

The writeMode parameter controls how records are distributed across local table nodes when writing to a ClickHouse distributed table with shardWrite=true.

Write mode Distribution strategy Use when
default All writes go to the first node Writing to a single-node cluster or testing
partition Records with the same shardingKey always go to the same node You need co-location of related records on one node
random Each record is written to a randomly selected node You want simple load spreading without key affinity
balance Records are distributed across nodes in round-robin order You want even distribution without key affinity (requires VVR 8.0.7 or later)
writeMode=partition requires ignoreDelete=true and is incompatible with exactlyOnce=true.

Data type mappings

Flink type ClickHouse type Supported Notes
BOOLEAN UInt8 / Boolean Yes ClickHouse V21.12 and later support the native BOOLEAN type. Earlier versions map to UInt8.
TINYINT Int8 Yes
SMALLINT Int16 Yes
INTEGER Int32 Yes
BIGINT Int64 Yes
BIGINT UInt32 Yes
FLOAT Float32 Yes
DOUBLE Float64 Yes
CHAR FixedString Yes
VARCHAR STRING Yes
BINARY FixedString Yes
VARBINARY STRING Yes
DATE Date Yes
TIMESTAMP(0) DateTime Yes Accurate to the second.
TIMESTAMP(x) DateTime64(x) Yes For VVR earlier than 6.0.6, precision is truncated to seconds due to a JDBC driver limitation. VVR 6.0.6 and later write DateTime64 at full precision.
DECIMAL DECIMAL Yes
ARRAY ARRAY Yes
ARRAY (per field) Nested Yes Map each field of a ClickHouse NESTED column to a separate ARRAY column in Flink. See the example below.
TIME No Not supported.
MAP No Not supported.
MULTISET No Not supported.
ROW No Not supported.

Working with NESTED columns

To write to a ClickHouse NESTED column, map each sub-field to a separate ARRAY column in Flink, using the ColumnName.FieldName naming convention.

ClickHouse table definition:

CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
);

Corresponding Flink DDL:

CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID`      ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);

Examples

Write to a single ClickHouse table

This example uses the built-in datagen connector to generate records and writes them to a ClickHouse table.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url'       = '<yourUrl>',
  'userName'  = '<yourUsername>',
  'password'  = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

Write directly to local tables of a distributed table

Assume three local tables named local_table_test exist on nodes 192.XX.XX.1, 192.XX.XX.2, and 192.XX.XX.3, and a distributed table named distributed_table_test is built on top of them.

Specify nodes manually

List all local table node addresses in url and set tableName to the local table name. Records with the same shardingKey value are routed to the same node.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'  = 'clickhouse',
  'url'        = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
  'userName'   = '<yourUsername>',
  'password'   = '<yourPassword>',
  'tableName'  = 'local_table_test',
  'shardWrite' = 'true',
  'writeMode'  = 'partition',
  'shardingKey' = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

Let Flink discover local table nodes automatically

Set inferLocalTable=true and point url to any node of the distributed table. Flink queries system.clusters to discover the local table nodes automatically.

CREATE TEMPORARY TABLE clickhouse_source (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'       = 'datagen',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE clickhouse_output (
  id   INT,
  name VARCHAR,
  age  BIGINT,
  rate FLOAT
) WITH (
  'connector'      = 'clickhouse',
  'url'            = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- URL of any node hosting the distributed table
  'userName'       = '<yourUsername>',
  'password'       = '<yourPassword>',
  'tableName'      = 'distributed_table_test',                    -- Name of the distributed table
  'shardWrite'     = 'true',
  'inferLocalTable' = 'true',                                     -- Auto-discover local table nodes
  'writeMode'      = 'partition',
  'shardingKey'    = 'name'
);

INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;

FAQ

Can I retract (delete) updated data from an ApsaraDB for ClickHouse result table?

Set ignoreDelete=false and define a primary key in the DDL statement. The connector then issues ALTER DELETE statements when it receives retract messages. Note that this significantly reduces write throughput, and it cannot be used together with writeMode=partition or exactlyOnce=true. For more details, see Can I retract the updated data from an ApsaraDB for ClickHouse result table?

When does written data become visible in ClickHouse?

ClickHouse uses asynchronous part merging, so data may not be immediately visible after a write completes. The delay depends on the merge interval configured in your ClickHouse cluster. For more details, see When can I view the data that is written to a ClickHouse sink table in the ClickHouse console?