All Products
Search
Document Center

ApsaraDB for ClickHouse:Synchronize data from an ApsaraMQ for Kafka instance

Last Updated:Jun 03, 2025

This topic describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster in real time.

Limits

Only data from ApsaraMQ for Kafka instances or self-managed Kafka clusters deployed on Elastic Compute Service (ECS) instances can be synchronized to ApsaraDB for ClickHouse instances.

Prerequisites

  • Destination ApsaraDB for ClickHouse cluster:

    • A destination cluster is created. A souce ApsaraMQ for Kafka instance and the destination cluster are deployed in the same region and use the same virtual private cloud (VPC). For more information, see Create an ApsaraDB for ClickHouse cluster.

    • An account is created to log on to a database in the destination cluster and has the permissions to perform operations on the database. For more information, see Account management.

  • Source ApsaraMQ for Kafka instance:

Usage notes

  • If a topic is subscribed by an ApsaraDB for ClickHouse Kafka external table, the topic cannot be consumed by other consumers.

  • When you create a Kafka external table, a materialized view, and a local table, the field types in the three tables must match.

Procedure

This example shows how to synchronize data from an ApsaraMQ for Kafka instance to the kafka_table_distributed distributed table in the default database of an ApsaraDB for ClickHouse cluster that runs Community-compatible Edition.

Step 1: Understand the synchronization principle

ApsaraDB for ClickHouse uses a built-in Kafka table engine and the materialized view mechanism to synchronize data from ApsaraMQ for Kafka instances. This achieves real-time data consumption and storage. The following figure shows the synchronization process.

image
  • Kafka topic: specifies the source data to be synchronized.

  • ApsaraDB for ClickHouse Kafka external table: pulls source data from a specified Kafka topic.

  • Materialized view: reads source data from a Kafka external table and inserts the data to an ApsaraDB for ClickHouse local table.

  • Local table: stores the synchronized data.

Step 2: Log on to an ApsaraDB for ClickHouse database

For more information about how to log on to an ApsaraDB for ClickHouse database, see Use DMS to connect to an ApsaraDB for ClickHouse cluster.

Step 3: Create a Kafka external table

ApsaraDB for ClickHouse uses a built-in Kafka table engine to pull source data from the specified Kafka topic. The Kafka external table has the following features:

  • By default, a Kafka external table cannot be directly queried.

  • A Kafka external table is only used to consume Kafka data and does not store data. You must use a materialized view to process the data and insert the data into the destination table for storage.

Sample statement for creating a table:

Important

The field format of a Kafka external table must match the Kafka data types.

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port1,host:port2,host:port3',
    kafka_topic_list = 'topic_name1,topic_name2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_num_consumers = N,]
    [kafka_thread_per_consumer = 1,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_auto_offset_reset = N]

The following table describes the parameters in the preceding statement.

Parameter

Required

Description

kafka_broker_list

Yes

The endpoints that are used to connect to the ApsaraMQ for Kafka instance. Separate the endpoints with commas (,). For more information about how to view endpoints, see View endpoints.

  • If you use an ApsaraMQ for Kafka instance, ApsaraDB for ClickHouse parses the domain name of the instance by default.

  • If you use a self-managed Kafka cluster, ApsaraDB for ClickHouse connects to the cluster by using the IP address or a custom domain name in a fixed format. The supported domain name rules are as follows:

    1. Domain names that end with .com.

    2. Domain names that end with .local and contain one of the following keywords: kafka, mysql, and rabbitmq.

kafka_topic_list

Yes

The names of topics in the ApsaraMQ for Kafka instance. Separate the names of topics with commas (,). For more information about how to view the name of a topic, see Step 1: Create a topic.

kafka_group_name

Yes

The name of the consumer group in the ApsaraMQ for Kafka instance. For more information, see Step 2: Create a group.

kafka_format

Yes

The format of the message body supported by ApsaraDB for ClickHouse.

Note

For more information about the formats of message bodies supported by ApsaraDB for ClickHouse, see Formats for Input and Output Data.

kafka_row_delimiter

No

The row delimiter that is used to separate rows. The default value is \n. You can also set this parameter based on the actual delimited format in which data is written.

kafka_num_consumers

No

The number of consumers that consume the data in the table. Default value: 1.

Note
  • If the throughput of one consumer is insufficient, specify a larger number of consumers.

  • The number of consumers cannot exceed the number of partitions in a topic because only one consumer can be assigned per partition.

kafka_thread_per_consumer

No

Specifies whether each consumer starts an independent thread for consumption. Default value: 0. Valid values:

  • 0: All consumers use one thread for consumption.

  • 1: Each consumer starts an independent thread for consumption.

For more information about how to improve the consumption speed, see Kafka Performance Optimization.

kafka_max_block_size

No

The maximum size of Kafka messages that can be written to the table in each batch. Default value: 65536. Unit: bytes.

kafka_skip_broken_messages

No

The tolerance of the Kafka message parser to dirty data. Default value: 0. If you set kafka_skip_broken_messages=N to N, the Kafka engine skips N Kafka messages that cannot be parsed. One message is equivalent to one row of data.

kafka_commit_every_batch

No

Specifies how often a commit operation is performed. Default value: 0. Valid values:

  • 0: A commit operation is performed only after a whole block is written.

  • 1: A commit operation is performed after a batch of data is written.

kafka_auto_offset_reset

No

The offset from which the data in the ApsaraMQ for Kafka instance used as the data source is consumed. Valid values:

  • earliest (default value): Data in the ApsaraMQ for Kafka instance is consumed from the earliest offset.

  • latest: Data in the ApsaraMQ for Kafka instance is consumed from the latest offset.

Note

This parameter is not supported by the ApsaraDB for ClickHouse clusters of version 21.8.

For more information about parameters, see Kafka.

Sample statement:

CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- Fields that define the table schema
    id Int32,
    name String               
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
    kafka_topic_list = 'testforCK',
    kafka_group_name = 'GroupForTestCK',
    kafka_format = 'CSV';

Step 4: Create a destination table for storage

The table creation statements vary based on the cluster editions.

For an Enterprise Edition cluster, you need to only create a local table. For a Community-compatible Edition cluster, you can create a distributed table based on your business requirements. For more information about table creation statements, see CREATE TABLE. Sample statements:

Enterprise Edition

CREATE TABLE default.kafka_table_local ON CLUSTER default (
  id Int32,
  name String
) ENGINE = MergeTree()
ORDER BY (id);

If the ON CLUSTER is not allowed for Replicated database error message appears when you execute the preceding statement, you can update the minor engine version of the cluster to solve this issue. For more information, see Update the minor engine version.

Community-compatible Edition

The table engines that are used by Single-replica Edition and Double-replica Edition clusters are different. Select a table engine based on the replica type you use.

Important

When you create tables in a Double-replica Edition cluster, make sure that the tables use Replicated table engines from the MergeTree family. If the tables use non-Replicated table engines, the data on the tables is not replicated across replicas. This can lead to data inconsistency.

Single-replica Edition

  1. Create a local table.

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = MergeTree()
    ORDER BY (id);
  2. Optional. Create a distributed table.

    If you want to import data only to the local table, skip this step.

    If you use a multi-node cluster, we recommend that you create a distributed table.

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

Double-replica Edition

  1. Create a local table.

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = ReplicatedMergeTree()
    ORDER BY (id);
  2. Optional. Create a distributed table.

    If you want to import data only to the local table, skip this step.

    If you use a multi-node cluster, we recommend that you create a distributed table.

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

Step 5: Create a materialized view

During synchronization, ApsaraDB for ClickHouse uses a materialized view to read source data from the Kafka external table and then inserts the data to an ApsaraDB for ClickHouse local table.

Sample statement for creating a materialized view:

Important

Make sure that the fields in the SELECT clause match the destination table schema. Alternatively, you can use a conversion function to convert data formats, ensuring schema consistency.

CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;

The following table describes the parameters.

Parameter name

Required

Description

Example

view_name

Yes

The name of the view.

consumer

dest_table

Yes

The destination table used to store Kafka data.

  • Community-compatible Edition cluster:

    • If you use a multi-node cluster, we recommend that you import data to a distributed table.

    • If the destination table is a local table, data is stored in the local table.

  • Enterprise Edition cluster: Enterprise Edition clusters do not support distributed tables. Data is stored in a local table.

  • Community-compatible Edition: kafka_table_distributed

  • Enterprise Edition: kafka_table_local

src_table

Yes

The Kafka external table.

kafka_src_table

Sample statements:

Enterprise Edition

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;

Community-compatible Edition

You can execute the following sample statement to store the source data in the kafka_table_distributed distributed table.

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

Step 6: Check whether data is synchronized

  1. Send messages to a topic of the ApsaraMQ for Kafka instance.

    1. Log on to the ApsaraMQ for Kafka console.

    2. On the Instances page, click the name of the instance that you want to manage.

    3. On the Topics page, find the required topic and choose More > Send Message in the Actions column.

    4. In the Send to Produce and Consume Message panel, specify the message content that you want to send.

      In this example, the messages 1,a and 2,b are sent.

    5. Click OK.

  2. Log on to the ApsaraDB for ClickHouse database and query the distributed table to check whether the data is synchronized.

    For more information about how to log on to an ApsaraDB for ClickHouse database, see Connect to ClickHouse by using DMS.

    Sample statements for verifying data:

    Enterprise Edition

    SELECT * FROM kafka_table_local; 

    Community-compatible Edition

    The following statement shows how to query a distributed table.

    • If the destination table is a local table, you must replace the distributed table name with the local table name in the statement.

    • If you use a Community-compatible Edition cluster with multiple nodes, we recommend that you query data from the distributed table. Otherwise, data is queried only from one node in the cluster, leading to incomplete results.

    SELECT * FROM kafka_table_distributed; 

    When you execute a query and receive a successful result, it indicates that data is synchronized from the ApsaraMQ for Kafka instance to the ApsaraDB for ClickHouse cluster.

    Sample result:

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘

    If the query result is different from what you expect, you can further troubleshoot the issue by following the instructions described in (Optional) Step 7: View the consumption status of the Kafka external table.

(Optional) Step 7: View the consumption status of the Kafka external table

If the synchronized data is inconsistent with the data in the ApsaraMQ for Kafka instance, you can use the system table to check the consumption status of the Kafka external table and troubleshoot message consumption issues.

Community-compatible Edition and Enterprise Edition V23.8 and later

View the consumption status of the Kafka external table by querying the system.kafka_consumers system table. Sample statement:

select * from system.kafka_consumers;

The following table describes the fields in the system.kafka_consumers table.

Field name

Description

database

The database where the Kafka external table is located.

table

The name of the Kafka external table.

consumer_id

The Kafka consumer ID.

A table can be consumed by multiple consumers. The consumers are specified by the kafka_num_consumers parameter during the creation of the Kafka external table.

assignments.topic

The Kafka topic.

assignments.partition_id

The Kafka partition ID.

A partition can only be assigned to one consumer.

assignments.current_offset

The current offset.

exceptions.time

The timestamp of the 10 most recent exceptions.

exceptions.text

The text of the 10 most recent exceptions.

last_poll_time

The timestamp of the most recent polling.

num_messages_read

The number of messages read by a consumer.

last_commit_time

The timestamp of the most recent commit.

num_commits

The total number of commits for the consumer.

last_rebalance_time

The timestamp of the most recent Kafka rebalancing.

num_rebalance_revocations

The number of times partitions that are revoked for the consumer.

num_rebalance_assignments

The number of times the consumer is assigned to the Kafka cluster.

is_currently_used

Indicates whether the consumer is in use.

last_used

The time when the consumer is last used. Unit: microseconds.

rdkafka_stat

The internal statistics information of the database. For more information, see librdkafka.

The default value is 3000, which indicates that statistics information is generated every three seconds.

Note

When ApsaraDB for ClickHouse is set with statistics_interval_ms=0, the statistics information collection for Kafka external tables can be disabled.

Earlier than Community-compatible Edition V23.8

View the consumption status of the Kafka external table by querying the system.kafka system table. Sample statement:

SELECT * FROM system.kafka;

The following table describes the fields in the system.kafka table.

Field name

Description

database

The name of the database where the Kafka external table is located.

table

The name of the Kafka external table.

topic

The name of the topic consumed by the Kafka external table.

consumer_group

The name of the group consumed by the Kafka external table.

last_read_message_count

The number of messages pulled by the Kafka external table.

status

The consumption status of the Kafka external table. Valid values:

  • no_view: No view is created for the Kafka external table.

  • attach_view: A view is created for the Kafka external table.

  • normal: Normal status.

    When the Kafka external table consumes data, the Kafka external table status is normal.

  • skip_parse: Parsing errors are skipped.

  • error: Consumption exceptions occur.

exception

The error details.

Note

When status is set to error, the error details are returned.

FAQ