This topic describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster in real time.
Limits
Only data from ApsaraMQ for Kafka instances or self-managed Kafka clusters deployed on Elastic Compute Service (ECS) instances can be synchronized to ApsaraDB for ClickHouse instances.
Prerequisites
Destination ApsaraDB for ClickHouse cluster:
A destination cluster is created. A souce ApsaraMQ for Kafka instance and the destination cluster are deployed in the same region and use the same virtual private cloud (VPC). For more information, see Create an ApsaraDB for ClickHouse cluster.
An account is created to log on to a database in the destination cluster and has the permissions to perform operations on the database. For more information, see Account management.
Source ApsaraMQ for Kafka instance:
A topic is created. For more information, see Step 1: Create a topic.
A consumer group is created. For more information, see Step 2: Create a group.
Usage notes
If a topic is subscribed by an ApsaraDB for ClickHouse Kafka external table, the topic cannot be consumed by other consumers.
When you create a Kafka external table, a materialized view, and a local table, the field types in the three tables must match.
Procedure
This example shows how to synchronize data from an ApsaraMQ for Kafka instance to the kafka_table_distributed distributed table in the default database of an ApsaraDB for ClickHouse cluster that runs Community-compatible Edition.
Step 1: Understand the synchronization principle
ApsaraDB for ClickHouse uses a built-in Kafka table engine and the materialized view mechanism to synchronize data from ApsaraMQ for Kafka instances. This achieves real-time data consumption and storage. The following figure shows the synchronization process.
Kafka topic: specifies the source data to be synchronized.
ApsaraDB for ClickHouse Kafka external table: pulls source data from a specified Kafka topic.
Materialized view: reads source data from a Kafka external table and inserts the data to an ApsaraDB for ClickHouse local table.
Local table: stores the synchronized data.
Step 2: Log on to an ApsaraDB for ClickHouse database
For more information about how to log on to an ApsaraDB for ClickHouse database, see Use DMS to connect to an ApsaraDB for ClickHouse cluster.
Step 3: Create a Kafka external table
ApsaraDB for ClickHouse uses a built-in Kafka table engine to pull source data from the specified Kafka topic. The Kafka external table has the following features:
By default, a Kafka external table cannot be directly queried.
A Kafka external table is only used to consume Kafka data and does not store data. You must use a materialized view to process the data and insert the data into the destination table for storage.
Sample statement for creating a table:
The field format of a Kafka external table must match the Kafka data types.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]
The following table describes the parameters in the preceding statement.
Parameter | Required | Description |
kafka_broker_list | Yes | The endpoints that are used to connect to the ApsaraMQ for Kafka instance. Separate the endpoints with commas (,). For more information about how to view endpoints, see View endpoints.
|
kafka_topic_list | Yes | The names of topics in the ApsaraMQ for Kafka instance. Separate the names of topics with commas (,). For more information about how to view the name of a topic, see Step 1: Create a topic. |
kafka_group_name | Yes | The name of the consumer group in the ApsaraMQ for Kafka instance. For more information, see Step 2: Create a group. |
kafka_format | Yes | The format of the message body supported by ApsaraDB for ClickHouse. Note For more information about the formats of message bodies supported by ApsaraDB for ClickHouse, see Formats for Input and Output Data. |
kafka_row_delimiter | No | The row delimiter that is used to separate rows. The default value is \n. You can also set this parameter based on the actual delimited format in which data is written. |
kafka_num_consumers | No | The number of consumers that consume the data in the table. Default value: 1. Note
|
kafka_thread_per_consumer | No | Specifies whether each consumer starts an independent thread for consumption. Default value: 0. Valid values:
For more information about how to improve the consumption speed, see Kafka Performance Optimization. |
kafka_max_block_size | No | The maximum size of Kafka messages that can be written to the table in each batch. Default value: 65536. Unit: bytes. |
kafka_skip_broken_messages | No | The tolerance of the Kafka message parser to dirty data. Default value: 0. If you set |
kafka_commit_every_batch | No | Specifies how often a commit operation is performed. Default value: 0. Valid values:
|
kafka_auto_offset_reset | No | The offset from which the data in the ApsaraMQ for Kafka instance used as the data source is consumed. Valid values:
Note This parameter is not supported by the ApsaraDB for ClickHouse clusters of version 21.8. |
For more information about parameters, see Kafka.
Sample statement:
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- Fields that define the table schema
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';
Step 4: Create a destination table for storage
The table creation statements vary based on the cluster editions.
For an Enterprise Edition cluster, you need to only create a local table. For a Community-compatible Edition cluster, you can create a distributed table based on your business requirements. For more information about table creation statements, see CREATE TABLE. Sample statements:
Enterprise Edition
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);
If the ON CLUSTER is not allowed for Replicated database
error message appears when you execute the preceding statement, you can update the minor engine version of the cluster to solve this issue. For more information, see Update the minor engine version.
Community-compatible Edition
The table engines that are used by Single-replica Edition and Double-replica Edition clusters are different. Select a table engine based on the replica type you use.
When you create tables in a Double-replica Edition cluster, make sure that the tables use Replicated table engines from the MergeTree family. If the tables use non-Replicated table engines, the data on the tables is not replicated across replicas. This can lead to data inconsistency.
Single-replica Edition
Create a local table.
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);
Optional. Create a distributed table.
If you want to import data only to the local table, skip this step.
If you use a multi-node cluster, we recommend that you create a distributed table.
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Double-replica Edition
Create a local table.
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);
Optional. Create a distributed table.
If you want to import data only to the local table, skip this step.
If you use a multi-node cluster, we recommend that you create a distributed table.
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Step 5: Create a materialized view
During synchronization, ApsaraDB for ClickHouse uses a materialized view to read source data from the Kafka external table and then inserts the data to an ApsaraDB for ClickHouse local table.
Sample statement for creating a materialized view:
Make sure that the fields in the SELECT clause match the destination table schema. Alternatively, you can use a conversion function to convert data formats, ensuring schema consistency.
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;
The following table describes the parameters.
Parameter name | Required | Description | Example |
view_name | Yes | The name of the view. | consumer |
dest_table | Yes | The destination table used to store Kafka data.
|
|
src_table | Yes | The Kafka external table. | kafka_src_table |
Sample statements:
Enterprise Edition
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;
Community-compatible Edition
You can execute the following sample statement to store the source data in the kafka_table_distributed distributed table.
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
Step 6: Check whether data is synchronized
Send messages to a topic of the ApsaraMQ for Kafka instance.
Log on to the ApsaraMQ for Kafka console.
On the Instances page, click the name of the instance that you want to manage.
On the Topics page, find the required topic and choose
in the Actions column.In the Send to Produce and Consume Message panel, specify the message content that you want to send.
In this example, the messages
1,a
and2,b
are sent.Click OK.
Log on to the ApsaraDB for ClickHouse database and query the distributed table to check whether the data is synchronized.
For more information about how to log on to an ApsaraDB for ClickHouse database, see Connect to ClickHouse by using DMS.
Sample statements for verifying data:
Enterprise Edition
SELECT * FROM kafka_table_local;
Community-compatible Edition
The following statement shows how to query a distributed table.
If the destination table is a local table, you must replace the distributed table name with the local table name in the statement.
If you use a Community-compatible Edition cluster with multiple nodes, we recommend that you query data from the distributed table. Otherwise, data is queried only from one node in the cluster, leading to incomplete results.
SELECT * FROM kafka_table_distributed;
When you execute a query and receive a successful result, it indicates that data is synchronized from the ApsaraMQ for Kafka instance to the ApsaraDB for ClickHouse cluster.
Sample result:
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘
If the query result is different from what you expect, you can further troubleshoot the issue by following the instructions described in (Optional) Step 7: View the consumption status of the Kafka external table.
(Optional) Step 7: View the consumption status of the Kafka external table
If the synchronized data is inconsistent with the data in the ApsaraMQ for Kafka instance, you can use the system table to check the consumption status of the Kafka external table and troubleshoot message consumption issues.
Community-compatible Edition and Enterprise Edition V23.8 and later
View the consumption status of the Kafka external table by querying the system.kafka_consumers
system table. Sample statement:
select * from system.kafka_consumers;
The following table describes the fields in the system.kafka_consumers
table.
Field name | Description |
database | The database where the Kafka external table is located. |
table | The name of the Kafka external table. |
consumer_id | The Kafka consumer ID. A table can be consumed by multiple consumers. The consumers are specified by the kafka_num_consumers parameter during the creation of the Kafka external table. |
assignments.topic | The Kafka topic. |
assignments.partition_id | The Kafka partition ID. A partition can only be assigned to one consumer. |
assignments.current_offset | The current offset. |
exceptions.time | The timestamp of the 10 most recent exceptions. |
exceptions.text | The text of the 10 most recent exceptions. |
last_poll_time | The timestamp of the most recent polling. |
num_messages_read | The number of messages read by a consumer. |
last_commit_time | The timestamp of the most recent commit. |
num_commits | The total number of commits for the consumer. |
last_rebalance_time | The timestamp of the most recent Kafka rebalancing. |
num_rebalance_revocations | The number of times partitions that are revoked for the consumer. |
num_rebalance_assignments | The number of times the consumer is assigned to the Kafka cluster. |
is_currently_used | Indicates whether the consumer is in use. |
last_used | The time when the consumer is last used. Unit: microseconds. |
rdkafka_stat | The internal statistics information of the database. For more information, see librdkafka. The default value is 3000, which indicates that statistics information is generated every three seconds. Note When ApsaraDB for ClickHouse is set with statistics_interval_ms=0, the statistics information collection for Kafka external tables can be disabled. |
Earlier than Community-compatible Edition V23.8
View the consumption status of the Kafka external table by querying the system.kafka
system table. Sample statement:
SELECT * FROM system.kafka;
The following table describes the fields in the system.kafka
table.
Field name | Description |
database | The name of the database where the Kafka external table is located. |
table | The name of the Kafka external table. |
topic | The name of the topic consumed by the Kafka external table. |
consumer_group | The name of the group consumed by the Kafka external table. |
last_read_message_count | The number of messages pulled by the Kafka external table. |
status | The consumption status of the Kafka external table. Valid values:
|
exception | The error details. Note When status is set to error, the error details are returned. |