KsqlDB是一個用於Apache Kafka的流式SQL引擎,KsqlDB降低了進入流處理的門檻,提供了一個簡單的、完全互動SQL介面,用於處理Kafka的資料,可以讓我們在流資料上持續執行SQL查詢,KsqlDB支援廣泛的強大的流處理操作,包括彙總、串連、視窗、會話等。
架構圖
下圖分別為傳統的流處理應用架構和基於KsqlDB的應用架構樣本。通過對比不難看出,流處理引擎以及連接器部分均從之前的獨立角色整合到了KsqlDB。除此之外,KsqlDB還通過物化視圖提供了流處理過程中的查詢功能。更多關於ksqlDB的資訊。請參見KsqlDB官方文檔。
傳統的流處理應用架構圖

基於KsqlDB應用的架構圖

使用KsqlDB
建立Topic並進行配置
建立Topic,本文將以Topic
ksql_test為例進行說明。建立Schema,選擇Avro校正模式並添加以下校正規則。
{ "namespace": "io.confluent.examples.clients.basicavro", "type": "record", "name": "Payment", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" } ] }為Topic
ksql_test開啟Schema格式校正。
授權
雲訊息佇列 Confluent 版支援對KsqlDB叢集進行RBAC授權,本文將以新建立的使用者test為例進行說明。
建立
test使用者,按照下面所述為其添加叢集授權。詳情請參見使用者管理和授權。使用者名稱
叢集
資源
角色
test
Kafka cluster
Cluster
SystemAdmin
test
KSQL
Cluster
ResourceOwner
test
Schema Registry
Cluster
SystemAdmin
為KsqlDB添加Topic
ksql_test的唯讀存取權限(KsqlDB的預設使用者為ksql)。使用者名稱
叢集
資源
角色
ksql
Kafka cluster
Topic
DeveloperRead
操作
登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表。
在頂部功能表列,選擇地區,然後在執行個體列表頁面,單擊目標執行個體名稱。
在实例详情頁面,單擊右上方的登入控制台進行Control Center登入。
登入Control Center控制台,在Home頁面單擊controlcenter.clusterk卡片,進入到Cluster overview頁面。

在首頁的左側導覽列,單擊ksqlDB,然後單擊目標KsqlDB叢集名稱。
在KsqlDB叢集詳情頁,單擊Editor頁簽,按需建立Stream、使用Ksql命令進行Select查詢等操作。詳情請參見KSQL快速使用。
建立Stream
CREATE STREAM ksql_test_stream WITH (KAFKA_TOPIC='ksql_test',VALUE_FORMAT='AVRO');從Stream中查詢資料
SELECT * FROM ksql_test_stream EMIT CHANGES;
測實驗證
開啟Stream資料查詢。
在ksqldb頁面中,單擊Editor頁簽,輸入下面查詢語句後,單擊Run query。
SELECT * FROM ksql_test_stream EMIT CHANGES;
發送測試訊息。
重新開啟一個Control Center控制台視窗。
在Topic
ksql_test詳情頁面,單擊Messages頁簽後,再單擊Produce a new message。在Produce a new message面板中,填入訊息內容後,單擊Produce。
{ "id": "Tome", "amount": 18 }
驗證發送的訊息。
返回到之前開啟的Stream資料查詢時段,已經查詢到了發送的測試訊息。
