All Products
Search
Document Center

ApsaraDB for ClickHouse:Synchronize data from an ApsaraMQ for Kafka instance

Last Updated:Mar 25, 2024

This topic describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster to meet your real-time data processing requirements.

Note

ApsaraDB for ClickHouse clusters can synchronize data only from a self-managed Kafka cluster deployed on an Elastic Compute Service (ECS) instance or from an ApsaraMQ for Kafka instance. The following example describes how to synchronize data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster.

Prerequisites

  • A destination ApsaraDB for ClickHouse cluster is created. An ApsaraMQ for Kafka instance and the destination ApsaraDB for ClickHouse cluster are deployed in the same region and use the same virtual private cloud (VPC). For more information, see Create an ApsaraDB for ClickHouse cluster.

  • An account is created in the destination ApsaraDB for ClickHouse cluster to log on to the database and has the permissions to perform operations on the database. For more information, see Create a database account and Grant permissions.

Procedure

  1. Log on to the ApsaraDB for ClickHouse console.

  2. In the top navigation bar, select the region where the cluster is deployed.

  3. On the Clusters page, click the Default Instances tab, find the cluster that you want to manage, and then click the ID of the cluster.

  4. In the upper-right corner of the Cluster Information page, click Log On to Database.

  5. In the Log on to Database Instance dialog box of the Data Management (DMS) console, set the Database Account and Database Password parameters and click Login.

  6. Create a table that uses the Kafka engine.

    Note
    • The table that uses the Kafka engine is used to consume data from ApsaraMQ for Kafka. The consumed data is synchronized to tables in ApsaraDB for ClickHouse.

    • The table that uses the Kafka engine cannot be directly used.

    • The table that uses the Kafka engine is used only to consume data in ApsaraMQ for Kafka and cannot be used to store data.

    Use the following syntax to create a table:

    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port1,host:port2,host:port3',
        kafka_topic_list = 'topic_name1,topic_name2,...',
        kafka_group_name = 'group_name',
        kafka_format = 'data_format'[,]
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_num_consumers = N,]
        [kafka_thread_per_consume = 1,]
        [kafka_max_block_size = 0,]
        [kafka_skip_broken_messages = N,]
        [kafka_commit_every_batch = 0,]
        [kafka_auto_offset_reset = N]

    The following table describes the parameters in the preceding statement.

    Parameter

    Required

    Description

    kafka_broker_list

    Yes

    The endpoints that are used to connect to the ApsaraMQ for Kafka instance used as the data source. Separate the endpoints with commas (,). For more information about how to view endpoints, see View endpoints.

    kafka_topic_list

    Yes

    The names of topics in the ApsaraMQ for Kafka instance used as the data source. Separate the names of topics with commas (,). For more information about how to view the name of a topic, see Step 1: Create a topic.

    kafka_group_name

    Yes

    The name of the consumer group in the ApsaraMQ for Kafka instance used as the data source. For more information about how to create a consumer group, see Step 2: Create a group.

    kafka_format

    Yes

    The format of the message body supported by ApsaraDB for ClickHouse.

    Note

    For more information about the formats of message bodies supported by ApsaraDB for ClickHouse, see Formats for Input and Output Data.

    kafka_row_delimiter

    No

    The row delimiter that is used to separate rows. The default value is \n. You can also set this parameter based on the actual delimited format in which data is written.

    kafka_num_consumers

    No

    The number of consumers that consume the data in the table. Default value: 1.

    Note
    • If the throughput of one consumer is insufficient, specify a larger number of consumers.

    • The number of consumers cannot exceed the number of partitions in a topic because only one consumer can be assigned per partition.

    kafka_thread_per_consumer

    No

    Specifies whether each consumer starts an independent thread for consumption. Default value: 0. Valid values:

    • 0: All consumers use one thread for consumption.

    • 1: Each consumer starts an independent thread for consumption.

    For more information about how to improve the consumption speed, see Clusters and Performance.

    kafka_max_block_size

    No

    The maximum size of Kafka messages that can be written to the table in each batch. Default value: 65536. Unit: bytes.

    kafka_skip_broken_messages

    No

    The tolerance of the Kafka message parser to dirty data. Default value: 0. If you set kafka_skip_broken_messages to N, the Kafka engine skips N Kafka messages that cannot be parsed. One message is equivalent to one row of data.

    kafka_commit_every_batch

    No

    Specifies how often a commit operation is performed. Default value: 0. Valid values:

    • 0: A commit operation is performed only after a whole block is written.

    • 1: A commit operation is performed after a batch of data is written.

    kafka_auto_offset_reset

    No

    The offset from which the data in the ApsaraMQ for Kafka instance used as the data source is consumed. Valid values:

    • earliest (default value): Data in the ApsaraMQ for Kafka instance is consumed from the earliest offset.

    • latest: Data in the ApsaraMQ for Kafka instance is consumed from the latest offset.

    Note

    This parameter is not supported by the ApsaraDB for ClickHouse clusters of version 21.8.

    Note

    For more information about other parameters, see Kafka.

    You can execute the following sample statement:

    CREATE TABLE default.kafka_src_table ON CLUSTER default
    ( -- Fields that define the schema of the table
        id Int32,
        name String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'CSV';
  7. Create a table in ApsaraDB for ClickHouse.

    Note

    For more information about the purpose of creating a local table or a distributed table, see Terms.

    1. Create a local table.

      CREATE TABLE default.kafka_table_local ON CLUSTER default (
        id Int32,
        name String
      ) ENGINE = MergeTree()
      ORDER BY (id);
    2. Create a distributed table.

      Note

      If you want to synchronize data only to a local table, skip this step.

      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
      ENGINE = Distributed(default, default, kafka_table_local, id);
  8. Create a view to synchronize data that is consumed from the table that uses the Kafka engine to the distributed table in ApsaraDB for ClickHouse.

    Note

    If you synchronize data to a local table, replace the name of the distributed table with the name of the local table in the statement.

    You can use the following syntax to create a view:

    CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];

    You can execute the following sample statement:

    CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

Verify the data synchronization result

You can use either of the following methods to verify the data synchronization result.

Query the distributed table in ApsaraDB for ClickHouse

  1. Use the topic of ApsaraMQ for Kafka to send messages.

    1. Log on to the ApsaraMQ for Kafka console.

    2. On the Instances page, click the name of the instance that has the topic that you want to use to send messages.

    3. On the Topics page, find the topic that you want to use to send messages, and choose More > Send Message in the Actions column.

    4. In the Start to Send and Consume Message panel, specify Message Content for the message that you want to send.

      In this example, messages 1,a and 2,b are sent by using this topic.

    5. Click OK.

  2. Execute the following query statement to query the distributed table in ApsaraDB for ClickHouse to check whether the data is synchronized to the distributed table:

    SELECT * FROM kafka_table_distributed; 
    Note

    If you synchronize data to a local table, replace the name of the distributed table with the name of the local table in the statement.

    The following query result is returned:

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘
    Note

    If the result is returned after you execute the query statement, the data has been synchronized from the ApsaraMQ for Kafka instance to the ApsaraDB for ClickHouse cluster.

Query the system table

Query the system table system.kafka to view the data consumption status of the table that uses the Kafka engine.

SELECT * FROM system.kafka

The following query result is returned:

┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│  default │  kafka_table_distributed    │ test  │   test         │          2              │   attach_view     │           │  
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘

The following table describes the fields that are returned in the query result.

Field

Description

database

The name of the database to which the table that uses the Kafka engine belongs.

table

The name of the table that uses the Kafka engine.

topic

The name of the topic for the table that uses the Kafka engine.

consumer_group

The name of the consumer group in the table that uses the Kafka engine.

last_read_message_count

The number of messages read by the table that uses the Kafka engine.

status

The consumption status of the table that uses the Kafka engine. Valid values:

  • no_view: No view is created for the table that uses the Kafka engine.

  • attach_view: A view is created for the table that uses the Kafka engine.

  • normal: The table that uses the Kafka engine is in the normal state.

    Note

    If the table that uses the Kafka engine is consuming data, the consumption status of the table is normal.

  • skip_parse: Messages that cannot be parsed are skipped by the Kafka engine.

  • error: A consumption error occurs.

exception

The details about the consumption error.

Note

If the value of the status field is error, the error details are returned for this field.

FAQ

For more information about the frequently asked questions of synchronizing data from an ApsaraMQ for Kafka instance to an ApsaraDB for ClickHouse cluster and related solutions, see FAQ.