The ClickHouse connector lets you write Flink SQL data to ClickHouse result tables in both batch and streaming modes. It supports writing to standard tables, distributed tables, and local tables directly.
Prerequisites
Before you begin, ensure that you have:
-
A ClickHouse table. For more information, see Create a new table.
-
A whitelist configured for your ClickHouse cluster:
-
ApsaraDB for ClickHouse cluster: Configure the whitelist
-
Alibaba Cloud E-MapReduce (EMR) ClickHouse cluster: Manage security groups
-
Self-managed ClickHouse on Elastic Compute Service (ECS): Overview
-
Other deployments: Add the CIDR block of the vSwitch used by Realtime Compute for Apache Flink to the allowlist of the machines running your ClickHouse cluster. To find the CIDR block, see FAQ about workspace and namespace management and operations.
-
Connector capabilities
| Item | Description |
|---|---|
| Table type | Result table |
| Running mode | Batch mode and streaming mode |
| Data format | N/A |
| Metrics | numRecordsOut, numRecordsOutPerSecond, currentSendTime. For details, see Metrics. |
| API type | SQL API |
| Data updates and deletes | Supported when a primary key is defined in the DDL statement and ignoreDelete is set to false. Enabling this significantly reduces write throughput. |
Limitations
-
The
sink.parallelismparameter is not supported. -
Result tables support at-least-once semantics by default.
-
Requires Ververica Runtime (VVR) 3.0.2 or later.
-
The
ignoreDeleteparameter requires VVR 3.0.3, VVR 4.0.7, or a later minor version of either. -
The NESTED data type requires VVR 4.0.10 or later.
-
Writing directly to a ClickHouse local table that corresponds to a distributed table requires VVR 4.0.11 or later. Only ApsaraDB for ClickHouse Community-compatible Edition clusters support this.
-
Exactly-once semantics for EMR ClickHouse clusters requires VVR 4.0.11 or later. This is no longer available for EMR V3.45.1 clusters or minor versions later than EMR V5.11.1, due to a capability change in EMR ClickHouse.
-
The
balancewrite mode requires VVR 8.0.7 or later.
Syntax
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000',
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
Parameters in the WITH clause
Required parameters
| Parameter | Data type | Description |
|---|---|---|
connector |
STRING | The connector type. Set to clickhouse. |
url |
STRING | The JDBC URL of ClickHouse, in the format jdbc:clickhouse://<yourNetworkAddress>:<PortId>/<yourDatabaseName>. If you omit the database name, the default database is used. To write to a distributed table, set this to the JDBC URL of the node hosting the distributed table. To write directly to local tables with manually specified nodes, list multiple node addresses separated by commas (see the examples below). |
userName |
STRING | The ClickHouse username. |
password |
STRING | The ClickHouse password. |
tableName |
STRING | The name of the ClickHouse table. When writing directly to local tables (shardWrite=true), set this to the local table name unless inferLocalTable=true, in which case set it to the distributed table name. |
Optional parameters
| Parameter | Data type | Default | Description |
|---|---|---|---|
maxRetryTimes |
INT | 3 |
Maximum number of retries when a write fails. |
batchSize |
INT | 100 |
Number of records buffered before a flush. A flush triggers when the buffer reaches batchSize records or when flushIntervalMs elapses, whichever comes first. |
flushIntervalMs |
LONG | 1000 |
Maximum interval between flushes, in milliseconds. |
ignoreDelete |
BOOLEAN | true |
Whether to drop DELETE messages. When set to false and a primary key is defined, the connector runs an ALTER statement to delete matching rows. Setting this to false significantly reduces throughput and is incompatible with writeMode=partition. |
shardWrite |
BOOLEAN | false |
Whether to bypass the distributed table and write directly to the underlying local tables. Set to true to increase write throughput for distributed tables. When false, set tableName to the distributed table name. |
inferLocalTable |
BOOLEAN | false |
When shardWrite=true, whether to let Flink automatically discover the local table nodes from the distributed table metadata. Requires shardWrite=true, tableName set to the distributed table name, and url pointing to a node hosting the distributed table. Not needed for non-distributed tables. |
writeMode |
ENUM | default |
The write distribution strategy for local tables. See Choose a write mode for guidance. Valid values: default, partition, random, balance. |
shardingKey |
STRING | None | The field (or comma-separated fields) used to route records when writeMode=partition. Records with the same key value go to the same local table node. |
exactlyOnce |
BOOLEAN | false |
Whether to use exactly-once semantics. Only supported for ClickHouse clusters deployed in Alibaba Cloud EMR. Incompatible with writeMode=partition. |
Choose a write mode
The writeMode parameter controls how records are distributed across local table nodes when writing to a ClickHouse distributed table with shardWrite=true.
| Write mode | Distribution strategy | Use when |
|---|---|---|
default |
All writes go to the first node | Writing to a single-node cluster or testing |
partition |
Records with the same shardingKey always go to the same node |
You need co-location of related records on one node |
random |
Each record is written to a randomly selected node | You want simple load spreading without key affinity |
balance |
Records are distributed across nodes in round-robin order | You want even distribution without key affinity (requires VVR 8.0.7 or later) |
writeMode=partitionrequiresignoreDelete=trueand is incompatible withexactlyOnce=true.
Data type mappings
| Flink type | ClickHouse type | Supported | Notes |
|---|---|---|---|
| BOOLEAN | UInt8 / Boolean | Yes | ClickHouse V21.12 and later support the native BOOLEAN type. Earlier versions map to UInt8. |
| TINYINT | Int8 | Yes | |
| SMALLINT | Int16 | Yes | |
| INTEGER | Int32 | Yes | |
| BIGINT | Int64 | Yes | |
| BIGINT | UInt32 | Yes | |
| FLOAT | Float32 | Yes | |
| DOUBLE | Float64 | Yes | |
| CHAR | FixedString | Yes | |
| VARCHAR | STRING | Yes | |
| BINARY | FixedString | Yes | |
| VARBINARY | STRING | Yes | |
| DATE | Date | Yes | |
| TIMESTAMP(0) | DateTime | Yes | Accurate to the second. |
| TIMESTAMP(x) | DateTime64(x) | Yes | For VVR earlier than 6.0.6, precision is truncated to seconds due to a JDBC driver limitation. VVR 6.0.6 and later write DateTime64 at full precision. |
| DECIMAL | DECIMAL | Yes | |
| ARRAY | ARRAY | Yes | |
| ARRAY (per field) | Nested | Yes | Map each field of a ClickHouse NESTED column to a separate ARRAY column in Flink. See the example below. |
| TIME | — | No | Not supported. |
| MAP | — | No | Not supported. |
| MULTISET | — | No | Not supported. |
| ROW | — | No | Not supported. |
Working with NESTED columns
To write to a ClickHouse NESTED column, map each sub-field to a separate ARRAY column in Flink, using the ColumnName.FieldName naming convention.
ClickHouse table definition:
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
);
Corresponding Flink DDL:
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
Examples
Write to a single ClickHouse table
This example uses the built-in datagen connector to generate records and writes them to a ClickHouse table.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;
Write directly to local tables of a distributed table
Assume three local tables named local_table_test exist on nodes 192.XX.XX.1, 192.XX.XX.2, and 192.XX.XX.3, and a distributed table named distributed_table_test is built on top of them.
Specify nodes manually
List all local table node addresses in url and set tableName to the local table name. Records with the same shardingKey value are routed to the same node.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'local_table_test',
'shardWrite' = 'true',
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;
Let Flink discover local table nodes automatically
Set inferLocalTable=true and point url to any node of the distributed table. Flink queries system.clusters to discover the local table nodes automatically.
CREATE TEMPORARY TABLE clickhouse_source (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE clickhouse_output (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- URL of any node hosting the distributed table
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = 'distributed_table_test', -- Name of the distributed table
'shardWrite' = 'true',
'inferLocalTable' = 'true', -- Auto-discover local table nodes
'writeMode' = 'partition',
'shardingKey' = 'name'
);
INSERT INTO clickhouse_output
SELECT id, name, age, rate
FROM clickhouse_source;
FAQ
Can I retract (delete) updated data from an ApsaraDB for ClickHouse result table?
Set ignoreDelete=false and define a primary key in the DDL statement. The connector then issues ALTER DELETE statements when it receives retract messages. Note that this significantly reduces write throughput, and it cannot be used together with writeMode=partition or exactlyOnce=true. For more details, see Can I retract the updated data from an ApsaraDB for ClickHouse result table?
When does written data become visible in ClickHouse?
ClickHouse uses asynchronous part merging, so data may not be immediately visible after a write completes. The delay depends on the merge interval configured in your ClickHouse cluster. For more details, see When can I view the data that is written to a ClickHouse sink table in the ClickHouse console?