全部產品
Search
文件中心

ApsaraDB for ClickHouse:從Kafka同步資料

更新時間:May 17, 2025

本文為您介紹如何將訊息佇列Kafka的資料即時同步至雲資料庫ClickHouse

使用限制

僅支援同步雲訊息佇列Kafka和部署在ECS上的自建Kafka的資料。

前提條件

  • 雲資料庫ClickHouse

    • 已建立目的地組群,且保證訊息佇列Kafka和目的地組群在同一地區並使用相同的VPC。如何建立,請參見建立叢集

    • 目的地組群已建立登入資料庫的帳號且已具備資料庫的操作許可權。具體操作,請參見帳號管理

  • 雲訊息佇列Kafka:

注意事項

  • 雲資料庫ClickHouse的Kafka外表所訂閱的Topic不能存在其他消費者。

  • 建立kafka外表、物化視圖和本地表時,三張表的欄位類型需要一一對應。

操作步驟

本樣本為將雲訊息佇列Kafka同步至雲資料庫ClickHouse社區相容叢集的資料庫default庫中的kafka_table_distributed分布式表中。

步驟一:瞭解同步原理

雲資料庫ClickHouse同步Kafka資料主要依賴於其內建的Kafka表引擎和物化視圖(Materialized View)機制,實現即時資料消費和儲存。具體資料鏈路如下。

  • Kafka主題:需要同步的來源資料。

  • 雲資料庫ClickHouse的Kafka外表(Kafka表引擎的表):從指定Kafka主題拉取來源資料。

  • 物化視圖:通過Kafka外表讀取來源資料並執行插入操作,將資料寫到雲資料庫ClickHouse本地表。

  • 本地表:儲存同步的資料。

步驟二:登入雲資料庫ClickHouse

如何登入,請參見通過DMS串連ClickHouse

步驟三:建立Kafka外表

雲資料庫ClickHouse同步Kafka資料依賴於其內建的Kafka表引擎從指定Kafka主題拉取來源資料。此表有以下特點:

  • Kafka外表預設不能直接查詢。

  • Kafka外表只是用來消費Kafka資料,沒有真正地儲存資料,需要通過物化視圖將資料加工並插入到目標表中儲存。

建表文法如下。

重要

Kafka外表欄位格式須與Kafka資料類型一致。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port1,host:port2,host:port3',
    kafka_topic_list = 'topic_name1,topic_name2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_num_consumers = N,]
    [kafka_thread_per_consumer = 1,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_auto_offset_reset = N]

常用參數說明如下。

名稱

是否必選

說明

kafka_broker_list

以英文逗號(,)分隔的Kafka的存取點地址清單。如何查看存取點,請參見查看存取點

  • 如果您使用阿里雲訊息佇列Kafka,Clickhouse預設支援解析對應的雲Kafka網域名稱。

  • 如果您使用了自建Kafka,雲資料庫Clickhouse目前支援通過IP或者固定格式的自訂網域名串連Kafka,支援的網域名稱規則如下:

    1. 以.com結尾的網域名稱。

    2. 以.local結尾,並包含kafka、mysql或rabbitmq任意一個關鍵詞的網域名稱。

kafka_topic_list

以英文逗號(,)分隔的Topic名稱列表。如何查看Topic名稱,請參見建立Topic

kafka_group_name

Kafka的消費組名稱。更多資訊,請參見建立Group

kafka_format

雲資料庫ClickHouse支援處理的訊息體格式。

說明

雲資料庫ClickHouse支援的訊息體格式,更多資訊請參見Formats for Input and Output Data

kafka_row_delimiter

行分隔字元,用於分隔不同的資料行。預設為“\n”,您也可以根據資料寫入的實際分隔格式進行設定。

kafka_num_consumers

單個表的消費者數量,預設值為1。

說明
  1. 一個消費者的輸送量不足時,需要指定更多的消費者。

  2. 消費者的總數不應超過Topic中的分區數,因為每個分區只能分配一個消費者。

kafka_thread_per_consumer

指定每個消費者是否啟動獨立線程進行消費,預設值為0。取值說明如下。

  1. 0:表示所有消費者共同使用1個線程消費。

  2. 1:表示每個消費者啟動獨立線程進行消費。

提升消費速度更多資訊請參見Kafka效能調優

kafka_max_block_size

Kafka訊息的最大批次大小,單位:Byte,預設值為65536。

kafka_skip_broken_messages

Kafka訊息解析器對於髒資料的容忍度,預設值為0。如果kafka_skip_broken_messages=N,則引擎將跳過N條無法解析的Kafka訊息(一條訊息等於一行資料)。

kafka_commit_every_batch

執行Kafka Commit的頻率,預設值為0,取值說明如下:

  1. 0:完全寫入一整個Block資料區塊的資料後才執行Commit。

  2. 1:每寫完一個Batch批次的資料就執行一次Commit。

kafka_auto_offset_reset

訊息的位移量,從哪個offset開始讀取Kafka資料,取值說明如下:

  1. earliest(預設值):從最早的offset開始讀取Kafka資料。

  2. latest:從最晚的offset開始讀取Kafka資料。

說明

21.8版本的雲資料庫ClickHouse叢集不支援該參數。

更多參數說明,請參見Kafka

樣本語句如下。

CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- 定義表結構的欄位
    id Int32,
    name String               
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
    kafka_topic_list = 'testforCK',
    kafka_group_name = 'GroupForTestCK',
    kafka_format = 'CSV';

步驟四:建立目標儲存表

您需根據叢集版本選擇建表語句。

企業版叢集僅需建立本地表,社區相容版叢集則可能需要根據您的環境和需求建立分布式表。以下為樣本語句,更多建表文法,請參見CREATE TABLE

企業版

CREATE TABLE default.kafka_table_local ON CLUSTER default (
  id Int32,
  name String
) ENGINE = MergeTree()
ORDER BY (id);

如果您執行此語句時報ON CLUSTER is not allowed for Replicated database的錯誤提示,可嘗試通過升級版本解決此問題,如何升級版本,請參見升級核心小版本

社區相容版

單副本和雙副本的引擎有所不同,請根據您的副本類型選擇相應的引擎。

重要

在雙複本集群中建表時,必須使用MergeTree系列引擎中支援資料複製的Replicated系列引擎。如果您在雙複本集群中,建立了非Replicated系列引擎的表,將導致副本之間無法進行資料複製,從而導致副本資料可能不一致。

單副本

  1. 建立本地表。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = MergeTree()
    ORDER BY (id);
  2. (可選)建立分布式表。

    如果您只需將檔案匯入至本地表中,可跳過此步驟。

    如果您的叢集為多節點叢集,建議您建立分布式表。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

雙副本

  1. 建立本地表。

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = ReplicatedMergeTree()
    ORDER BY (id);
  2. (可選)建立分布式表。

    如果您只需將檔案匯入至本地表中,可跳過此步驟。

    如果您的叢集為多節點叢集,建議您建立分布式表。

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

步驟五:建立物化視圖

雲資料庫ClickHouse同步資料依賴於物化視圖通過Kafka外表讀取來源資料並觸發插入操作,將資料寫入到雲資料庫ClickHouse本地表。

建立物化視圖文法如下。

重要

您需確保SELECT欄位與目標表結構一致,或通過轉換函式處理資料格式使其與目標表一致。

CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;

參數說明如下。

參數名稱

是否必填

描述

樣本

view_name

視圖名稱。

consumer

dest_table

用於儲存Kafka資料的目標表。

  • 社區相容版叢集:

    • 多節點叢集,建議將資料匯入至分布式表中。

    • 如果您同步的目標表是本地表,此處表為本地表。

  • 企業版叢集:企業版沒有分布式表,此處為本地表。

  • 社區相容版樣本:kafka_table_distributed

  • 企業版樣本:kafka_table_local

src_table

kafka外表。

kafka_src_table

樣本語句如下。

企業版

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;

社區相容版

此處樣本時將來源資料儲存至kafka_table_distributed分布式表中。

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

步驟六:驗證同步是否生效

  1. 在雲訊息佇列Kafka版的Topic端發送訊息。

    1. 登入訊息佇列Kafka控制台

    2. 執行個體列表頁面,單擊目標執行個體名稱。

    3. Topic管理頁面,單擊目標Topic操作列的更多 > 體驗發送訊息

    4. 快速體驗訊息收發頁面,輸入發送的訊息內容

      本文以發送訊息1,a2,b為例。

    5. 單擊確定

  2. 登入雲資料庫ClickHouse,查詢分布式表,確認資料是否同步成功。

    如何登入雲資料庫ClickHouse,請參見通過DMS串連ClickHouse

    驗證資料的查詢語句如下。

    企業版

    SELECT * FROM kafka_table_local; 

    社區相容版

    以下是查詢分布式表的樣本。

    • 如果您同步的目的表是本地表,需將查詢語句中的分布式表名更換為本地表名,再進行查詢。

    • 如果您是社區版叢集,並且是多節點叢集,強烈建議您查詢分布式表;若未查詢分布式表,則只能擷取叢集中一個節點的資料,這可能會導致查詢結果少於您匯入的資料。

    SELECT * FROM kafka_table_distributed; 

    當您執行查詢語句並成功返回結果時,說明資料已從Kafka同步至雲資料庫ClickHouse

    查詢結果如下。

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘

    如果查詢結果與您期望的不同,您可通過步驟七(可選):查看Kafka外表的消費狀態進一步排查問題。

步驟七(可選):查看Kafka外表的消費狀態

如果您同步的資料與Kafka的資料不一致時,可通過系統資料表協助您查看Kafka外表的消費狀態,排查訊息消費異常的問題。

核心版本大於等於23.8的社區相容版叢集及企業版叢集

通過查詢系統資料表system.kafka_consumers查看Kafka外表的消費狀態,查詢語句如下。

select * from system.kafka_consumers;

system.kafka_consumers表欄位說明如下。

欄位名稱

說明

database

使用Kafka外表所在的資料庫。

table

使用Kafka外表的表名。

consumer_id

Kafka消費者標識符。

一個表可以有多個消費者,由建立Kafka外表時的kafka_num_consumers參數指定。

assignments.topic

Kafka主題。

assignments.partition_id

Kafka分區ID。

一個分區只能分配給一個消費者。

assignments.current_offset

當前位移量。

exceptions.time

最近10個異常產生的時間戳記。

exceptions.text

最近10個異常的文本。

last_poll_time

最近一次輪詢的時間戳記。

num_messages_read

消費者讀取的訊息數量。

last_commit_time

最近一次提交的時間戳記。

num_commits

消費者的總提交次數。

last_rebalance_time

最近一次Kafka重新平衡的時間戳記。

num_rebalance_revocations

消費者被撤銷分區的次數。

num_rebalance_assignments

消費者被分配到Kafka叢集的次數。

is_currently_used

消費者是否正在使用中。

last_used

該消費者最後一次使用的時間,以微秒為單位的Unix時間。

rdkafka_stat

庫內部統計資訊。更多詳情,請參見librdkafka

預設為3000,表示每3秒產生一次統計資訊。

說明

雲資料庫ClickHouse設定 statistics_interval_ms=0時,可禁止Kafka外表的統計資訊收集。

核心版本小於23.8以下的社區相容版叢集

通過查詢系統資料表system.kafka查看Kafka外表的消費狀態,查詢語句如下。

SELECT * FROM system.kafka;

system.kafka表欄位說明如下。

欄位名稱

說明

database

Kafka外表的資料庫名稱。

table

Kafka外表的表名。

topic

Kafka外表消費的topic名稱。

consumer_group

Kafka外表消費的group名稱。

last_read_message_count

拉取到的Kafka外表的消費訊息的數量。

status

Kafka外表的消費Kafka訊息的狀態。取值說明:

  • no_view:Kafka外表沒有建立視圖。

  • attach_view:Kafka外表建立了視圖。

  • normal:正常狀態。

    當kafka外表有消費資料時,Kafka外表的消費狀態為normal

  • skip_parse:跳過錯誤解析。

  • error:消費異常。

exception

異常詳情。

說明

status取值為error時,該參數返回異常詳情。

常見問題