ApsaraDB for ClickHouse uses a built-in Kafka table engine and a materialized view to ingest real-time data from ApsaraMQ for Kafka. This guide walks you through building the three-layer pipeline — a Kafka external table, a materialized view, and a storage table — and verifying end-to-end data flow.
Limitations
Only the following Kafka sources can sync to ApsaraDB for ClickHouse:
ApsaraMQ for Kafka instances
Self-managed Kafka clusters deployed on Elastic Compute Service (ECS) instances
Prerequisites
Before you begin, ensure that you have:
An ApsaraDB for ClickHouse cluster in the same region and virtual private cloud (VPC) as the source ApsaraMQ for Kafka instance. See Create a cluster.
A database account with permissions to perform operations on the destination database. See Account management.
A topic created in the ApsaraMQ for Kafka instance. See Step 1: Create a topic.
A consumer group created in the ApsaraMQ for Kafka instance. See Step 2: Create a group.
Usage notes
A topic subscribed by a ClickHouse Kafka external table cannot be consumed by any other consumer simultaneously.
The field types in the Kafka external table, the materialized view, and the local table must all match.
How it works
ApsaraDB for ClickHouse syncs data from Kafka through a three-layer pipeline:
Kafka external table — pulls messages from a specified Kafka topic using the built-in Kafka table engine. Direct
SELECTqueries against this table are not supported by default.Materialized view — triggered automatically on each batch from the Kafka external table. It reads the batch and inserts the rows into the local table.
Local table — the final storage destination. Query this table (or a distributed table built on top of it) for your data.
Step 1: Log on to an ApsaraDB for ClickHouse database
Step 2: Create a Kafka external table
The Kafka external table connects ClickHouse to a Kafka topic and consumes messages. It does not store data.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]The field types in the Kafka external table must match the Kafka message format.
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
kafka_broker_list | Yes | — | Comma-separated broker endpoints for the ApsaraMQ for Kafka instance. See View endpoints. For ApsaraMQ for Kafka instances, ClickHouse resolves domain names automatically. For self-managed Kafka clusters on ECS, connect using an IP address or a custom domain name ending in .com, or ending in .local and containing kafka, mysql, or rabbitmq. |
kafka_topic_list | Yes | — | Comma-separated topic names. See Step 1: Create a topic. |
kafka_group_name | Yes | — | Consumer group name. See Step 2: Create a group. |
kafka_format | Yes | — | Message body format. For supported formats, see Formats for Input and Output Data. |
kafka_row_delimiter | No | \n | Row delimiter. |
kafka_num_consumers | No | 1 | Number of consumers per table. Increase this value if one consumer cannot keep up with throughput. Cannot exceed the number of partitions in the topic (one consumer per partition maximum). |
kafka_thread_per_consumer | No | 0 | 0: all consumers share one thread. 1: each consumer runs in its own thread. For tuning advice, see Kafka performance optimization. |
kafka_max_block_size | No | 65536 | Maximum batch size written per poll, in bytes. |
kafka_skip_broken_messages | No | 0 | Number of malformed messages to skip. If set to N, the Kafka engine skips N messages that cannot be parsed. One message is equivalent to one row of data. |
kafka_commit_every_batch | No | 0 | 0: commit after a full block is written. 1: commit after each batch. |
kafka_auto_offset_reset | No | earliest | Starting offset: earliest (consume from the beginning) or latest (consume only new messages). Not supported on version 21.8 clusters. |
For the full parameter reference, see Kafka table engine.
Example
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';Step 3: Create a destination table
The destination table stores the synchronized data. The table engine depends on your cluster edition.
For more information on CREATE TABLE syntax, see CREATE TABLE.
Enterprise Edition
Create a local table using MergeTree:
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);If the statement returns ON CLUSTER is not allowed for Replicated database, update the minor engine version. See Upgrade the minor engine version.Enterprise Edition clusters do not support distributed tables. All data is stored in the local table.
Community-compatible Edition
Single-replica Edition
Create a local table:
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);(Optional) For multi-node clusters, create a distributed table on top of the local table:
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);Skip this step if you plan to write directly to the local table.
Double-replica Edition
Double-replica Edition clusters require Replicated table engines from the MergeTree family. Using non-replicated engines causes data inconsistency between replicas.
Create a local table using ReplicatedMergeTree:
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);(Optional) For multi-node clusters, create a distributed table:
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Step 4: Create a materialized view
The materialized view reads each batch from the Kafka external table and inserts the rows into the destination table.
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;The columns in the SELECT clause must match the destination table schema. Use conversion functions if the types differ.
| Parameter | Required | Description | Example |
|---|---|---|---|
view_name | Yes | Name of the materialized view. | consumer |
dest_table | Yes | Destination table for Kafka data. For Community-compatible Edition multi-node clusters, use the distributed table. For Enterprise Edition clusters, use the local table. | Community-compatible Edition: kafka_table_distributed; Enterprise Edition: kafka_table_local |
src_table | Yes | The Kafka external table created in Step 2. | kafka_src_table |
Enterprise Edition
CREATE MATERIALIZED VIEW consumer ON CLUSTER default
TO kafka_table_local
AS SELECT * FROM kafka_src_table;Community-compatible Edition
CREATE MATERIALIZED VIEW consumer ON CLUSTER default
TO kafka_table_distributed
AS SELECT * FROM kafka_src_table;Step 5: Verify data synchronization
Send test messages to the Kafka topic:
Log on to the ApsaraMQ for Kafka console.
On the Instances page, click the instance name.
On the Topics page, find the topic and choose More > Send Message in the Actions column.
In the Send to Produce and Consume Message panel, enter the message content. This example sends
1,aand2,b.Click OK.
Log on to the ApsaraDB for ClickHouse database and query the destination table. See Connect to a ClickHouse cluster using DMS
Enterprise Edition
SELECT * FROM kafka_table_local;Community-compatible Edition
On multi-node Community-compatible Edition clusters, always query the distributed table. Querying the local table directly returns data from only one node and produces incomplete results.
SELECT * FROM kafka_table_distributed;Expected output:
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘If the result matches, data synchronization is working. If not, proceed to Step 6 to check the consumer status.
Step 6 (Optional): Check consumer status
If synchronized data does not match what was sent, query the system tables to inspect the Kafka external table's consumer state.
Community-compatible Edition and Enterprise Edition V23.8 and later
SELECT * FROM system.kafka_consumers;| Field | Description |
|---|---|
database | Database where the Kafka external table is located. |
table | Name of the Kafka external table. |
consumer_id | Consumer ID. A single table can have multiple consumers, set by kafka_num_consumers. |
assignments.topic | Kafka topic assigned to the consumer. |
assignments.partition_id | Partition assigned to the consumer. Each partition is assigned to exactly one consumer. |
assignments.current_offset | Current consumer offset. |
exceptions.time | Timestamps of the 10 most recent exceptions. |
exceptions.text | Text of the 10 most recent exceptions. |
last_poll_time | Timestamp of the most recent poll. |
num_messages_read | Total messages read by the consumer. |
last_commit_time | Timestamp of the most recent commit. |
num_commits | Total number of commits. |
last_rebalance_time | Timestamp of the most recent Kafka rebalancing. |
num_rebalance_revocations | Number of times partitions were revoked from the consumer. |
num_rebalance_assignments | Number of times the consumer was assigned to the Kafka cluster. |
is_currently_used | Whether the consumer is currently active. |
last_used | Time the consumer was last used, in microseconds. |
rdkafka_stat | Internal librdkafka statistics. See librdkafka statistics. The default value is 3000, which indicates that statistics information is generated every 3 seconds. Set statistics_interval_ms=0 to disable statistics collection. |
Earlier than Community-compatible Edition V23.8
SELECT * FROM system.kafka;| Field | Description |
|---|---|
database | Database where the Kafka external table is located. |
table | Name of the Kafka external table. |
topic | Topic consumed by the Kafka external table. |
consumer_group | Consumer group name. |
last_read_message_count | Number of messages pulled in the last read. |
status | Consumption status: no_view (no materialized view attached), attach_view (view attached), normal (consuming normally), skip_parse (skipping parse errors), error (consumption error). |
exception | Error details. Populated only when status is error. |