本文為您介紹如何將訊息佇列Kafka的資料即時同步至雲資料庫ClickHouse。
使用限制
僅支援同步雲訊息佇列Kafka和部署在ECS上的自建Kafka的資料。
前提條件
注意事項
雲資料庫ClickHouse的Kafka外表所訂閱的Topic不能存在其他消費者。
建立kafka外表、物化視圖和本地表時,三張表的欄位類型需要一一對應。
操作步驟
本樣本為將雲訊息佇列Kafka同步至雲資料庫ClickHouse社區相容叢集的資料庫default庫中的kafka_table_distributed分布式表中。
步驟一:瞭解同步原理
雲資料庫ClickHouse同步Kafka資料主要依賴於其內建的Kafka表引擎和物化視圖(Materialized View)機制,實現即時資料消費和儲存。具體資料鏈路如下。
Kafka主題:需要同步的來源資料。
雲資料庫ClickHouse的Kafka外表(Kafka表引擎的表):從指定Kafka主題拉取來源資料。
物化視圖:通過Kafka外表讀取來源資料並執行插入操作,將資料寫到雲資料庫ClickHouse本地表。
本地表:儲存同步的資料。
步驟二:登入雲資料庫ClickHouse
如何登入,請參見通過DMS串連ClickHouse。
步驟三:建立Kafka外表
雲資料庫ClickHouse同步Kafka資料依賴於其內建的Kafka表引擎從指定Kafka主題拉取來源資料。此表有以下特點:
Kafka外表預設不能直接查詢。
Kafka外表只是用來消費Kafka資料,沒有真正地儲存資料,需要通過物化視圖將資料加工並插入到目標表中儲存。
建表文法如下。
Kafka外表欄位格式須與Kafka資料類型一致。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]常用參數說明如下。
名稱 | 是否必選 | 說明 |
kafka_broker_list | 是 | 以英文逗號(,)分隔的Kafka的存取點地址清單。如何查看存取點,請參見查看存取點。
|
kafka_topic_list | 是 | 以英文逗號(,)分隔的Topic名稱列表。如何查看Topic名稱,請參見建立Topic。 |
kafka_group_name | 是 | Kafka的消費組名稱。更多資訊,請參見建立Group。 |
kafka_format | 是 | 雲資料庫ClickHouse支援處理的訊息體格式。 說明 雲資料庫ClickHouse支援的訊息體格式,更多資訊請參見Formats for Input and Output Data。 |
kafka_row_delimiter | 否 | 行分隔字元,用於分隔不同的資料行。預設為“\n”,您也可以根據資料寫入的實際分隔格式進行設定。 |
kafka_num_consumers | 否 | 單個表的消費者數量,預設值為1。 說明
|
kafka_thread_per_consumer | 否 | 指定每個消費者是否啟動獨立線程進行消費,預設值為0。取值說明如下。
提升消費速度更多資訊請參見Kafka效能調優。 |
kafka_max_block_size | 否 | Kafka訊息的最大批次大小,單位:Byte,預設值為65536。 |
kafka_skip_broken_messages | 否 | Kafka訊息解析器對於髒資料的容忍度,預設值為0。如果 |
kafka_commit_every_batch | 否 | 執行Kafka Commit的頻率,預設值為0,取值說明如下:
|
kafka_auto_offset_reset | 否 | 訊息的位移量,從哪個offset開始讀取Kafka資料,取值說明如下:
說明 21.8版本的雲資料庫ClickHouse叢集不支援該參數。 |
更多參數說明,請參見Kafka。
樣本語句如下。
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- 定義表結構的欄位
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';步驟四:建立目標儲存表
您需根據叢集版本選擇建表語句。
企業版叢集僅需建立本地表,社區相容版叢集則可能需要根據您的環境和需求建立分布式表。以下為樣本語句,更多建表文法,請參見CREATE TABLE。
企業版
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);如果您執行此語句時報ON CLUSTER is not allowed for Replicated database的錯誤提示,可嘗試通過升級版本解決此問題,如何升級版本,請參見升級核心小版本。
社區相容版
單副本和雙副本的引擎有所不同,請根據您的副本類型選擇相應的引擎。
在雙複本集群中建表時,必須使用MergeTree系列引擎中支援資料複製的Replicated系列引擎。如果您在雙複本集群中,建立了非Replicated系列引擎的表,將導致副本之間無法進行資料複製,從而導致副本資料可能不一致。
單副本
建立本地表。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);(可選)建立分布式表。
如果您只需將檔案匯入至本地表中,可跳過此步驟。
如果您的叢集為多節點叢集,建議您建立分布式表。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
雙副本
建立本地表。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);(可選)建立分布式表。
如果您只需將檔案匯入至本地表中,可跳過此步驟。
如果您的叢集為多節點叢集,建議您建立分布式表。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
步驟五:建立物化視圖
雲資料庫ClickHouse同步資料依賴於物化視圖通過Kafka外表讀取來源資料並觸發插入操作,將資料寫入到雲資料庫ClickHouse本地表。
建立物化視圖文法如下。
您需確保SELECT欄位與目標表結構一致,或通過轉換函式處理資料格式使其與目標表一致。
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;參數說明如下。
參數名稱 | 是否必填 | 描述 | 樣本 |
view_name | 是 | 視圖名稱。 | consumer |
dest_table | 是 | 用於儲存Kafka資料的目標表。
|
|
src_table | 是 | kafka外表。 | kafka_src_table |
樣本語句如下。
企業版
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;社區相容版
此處樣本時將來源資料儲存至kafka_table_distributed分布式表中。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;步驟六:驗證同步是否生效
在雲訊息佇列Kafka版的Topic端發送訊息。
登入訊息佇列Kafka控制台。
在執行個體列表頁面,單擊目標執行個體名稱。
在Topic管理頁面,單擊目標Topic操作列的。
在快速體驗訊息收發頁面,輸入發送的訊息內容。
本文以發送訊息
1,a和2,b為例。單擊確定。
登入雲資料庫ClickHouse,查詢分布式表,確認資料是否同步成功。
如何登入雲資料庫ClickHouse,請參見通過DMS串連ClickHouse。
驗證資料的查詢語句如下。
企業版
SELECT * FROM kafka_table_local;社區相容版
以下是查詢分布式表的樣本。
如果您同步的目的表是本地表,需將查詢語句中的分布式表名更換為本地表名,再進行查詢。
如果您是社區版叢集,並且是多節點叢集,強烈建議您查詢分布式表;若未查詢分布式表,則只能擷取叢集中一個節點的資料,這可能會導致查詢結果少於您匯入的資料。
SELECT * FROM kafka_table_distributed;當您執行查詢語句並成功返回結果時,說明資料已從Kafka同步至雲資料庫ClickHouse。
查詢結果如下。
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘如果查詢結果與您期望的不同,您可通過步驟七(可選):查看Kafka外表的消費狀態進一步排查問題。
步驟七(可選):查看Kafka外表的消費狀態
如果您同步的資料與Kafka的資料不一致時,可通過系統資料表協助您查看Kafka外表的消費狀態,排查訊息消費異常的問題。
核心版本大於等於23.8的社區相容版叢集及企業版叢集
通過查詢系統資料表system.kafka_consumers查看Kafka外表的消費狀態,查詢語句如下。
select * from system.kafka_consumers;system.kafka_consumers表欄位說明如下。
欄位名稱 | 說明 |
database | 使用Kafka外表所在的資料庫。 |
table | 使用Kafka外表的表名。 |
consumer_id | Kafka消費者標識符。 一個表可以有多個消費者,由建立Kafka外表時的kafka_num_consumers參數指定。 |
assignments.topic | Kafka主題。 |
assignments.partition_id | Kafka分區ID。 一個分區只能分配給一個消費者。 |
assignments.current_offset | 當前位移量。 |
exceptions.time | 最近10個異常產生的時間戳記。 |
exceptions.text | 最近10個異常的文本。 |
last_poll_time | 最近一次輪詢的時間戳記。 |
num_messages_read | 消費者讀取的訊息數量。 |
last_commit_time | 最近一次提交的時間戳記。 |
num_commits | 消費者的總提交次數。 |
last_rebalance_time | 最近一次Kafka重新平衡的時間戳記。 |
num_rebalance_revocations | 消費者被撤銷分區的次數。 |
num_rebalance_assignments | 消費者被分配到Kafka叢集的次數。 |
is_currently_used | 消費者是否正在使用中。 |
last_used | 該消費者最後一次使用的時間,以微秒為單位的Unix時間。 |
rdkafka_stat | 庫內部統計資訊。更多詳情,請參見librdkafka。 預設為3000,表示每3秒產生一次統計資訊。 說明 當雲資料庫ClickHouse設定 statistics_interval_ms=0時,可禁止Kafka外表的統計資訊收集。 |
核心版本小於23.8以下的社區相容版叢集
通過查詢系統資料表system.kafka查看Kafka外表的消費狀態,查詢語句如下。
SELECT * FROM system.kafka;system.kafka表欄位說明如下。
欄位名稱 | 說明 |
database | Kafka外表的資料庫名稱。 |
table | Kafka外表的表名。 |
topic | Kafka外表消費的topic名稱。 |
consumer_group | Kafka外表消費的group名稱。 |
last_read_message_count | 拉取到的Kafka外表的消費訊息的數量。 |
status | Kafka外表的消費Kafka訊息的狀態。取值說明:
|
exception | 異常詳情。 說明 當status取值為error時,該參數返回異常詳情。 |