全部產品
Search
文件中心

ApsaraMQ for Kafka:KsqlDB

更新時間:May 21, 2025

KsqlDB是一個用於Apache Kafka的流式SQL引擎,KsqlDB降低了進入流處理的門檻,提供了一個簡單的、完全互動SQL介面,用於處理Kafka的資料,可以讓我們在流資料上持續執行SQL查詢,KsqlDB支援廣泛的強大的流處理操作,包括彙總、串連、視窗、會話等。

架構圖

下圖分別為傳統的流處理應用架構和基於KsqlDB的應用架構樣本。通過對比不難看出,流處理引擎以及連接器部分均從之前的獨立角色整合到了KsqlDB。除此之外,KsqlDB還通過物化視圖提供了流處理過程中的查詢功能。更多關於ksqlDB的資訊。請參見KsqlDB官方文檔

  • 傳統的流處理應用架構圖image

  • 基於KsqlDB應用的架構圖image

使用KsqlDB

建立Topic並進行配置

  1. 建立Topic,本文將以Topicksql_test為例進行說明。

  2. 建立Schema,選擇Avro校正模式並添加以下校正規則。

    {
        "namespace": "io.confluent.examples.clients.basicavro",
        "type": "record",
        "name": "Payment",
        "fields": [
            {
                "name": "id",
                "type": "string"
            },
            {
                "name": "amount",
                "type": "double"
            }
        ]
    }
  3. 為Topicksql_test開啟Schema格式校正

授權

雲訊息佇列 Confluent 版支援對KsqlDB叢集進行RBAC授權,本文將以新建立的使用者test為例進行說明。

  1. 建立test使用者,按照下面所述為其添加叢集授權。詳情請參見使用者管理和授權

    使用者名稱

    叢集

    資源

    角色

    test

    Kafka cluster

    Cluster

    SystemAdmin

    test

    KSQL

    Cluster

    ResourceOwner

    test

    Schema Registry

    Cluster

    SystemAdmin

  2. 為KsqlDB添加Topicksql_test的唯讀存取權限(KsqlDB的預設使用者為ksql)。

    使用者名稱

    叢集

    資源

    角色

    ksql

    Kafka cluster

    Topic

    DeveloperRead

操作

  1. 登入雲訊息佇列 Confluent 版控制台,在左側導覽列,單擊实例列表

  2. 在頂部功能表列,選擇地區,然後在執行個體列表頁面,單擊目標執行個體名稱。

  3. 实例详情頁面,單擊右上方的登入控制台進行Control Center登入。

  4. 登入Control Center控制台,在Home頁面單擊controlcenter.clusterk卡片,進入到Cluster overview頁面。

    image

  5. 在首頁的左側導覽列,單擊ksqlDB,然後單擊目標KsqlDB叢集名稱。

  6. 在KsqlDB叢集詳情頁,單擊Editor頁簽,按需建立Stream、使用Ksql命令進行Select查詢等操作。詳情請參見KSQL快速使用

    • 建立Stream

      CREATE STREAM ksql_test_stream WITH (KAFKA_TOPIC='ksql_test',VALUE_FORMAT='AVRO');
    • 從Stream中查詢資料

      SELECT * FROM ksql_test_stream EMIT CHANGES;

測實驗證

  1. 開啟Stream資料查詢。

    ksqldb頁面中,單擊Editor頁簽,輸入下面查詢語句後,單擊Run query

    SELECT * FROM ksql_test_stream EMIT CHANGES;

    image

  2. 發送測試訊息。

    1. 重新開啟一個Control Center控制台視窗。

    2. 在Topicksql_test詳情頁面,單擊Messages頁簽後,再單擊Produce a new message

    3. Produce a new message面板中,填入訊息內容後,單擊Produce

      {
          "id": "Tome",
          "amount": 18
      }

      image

  3. 驗證發送的訊息。

    返回到之前開啟的Stream資料查詢時段,已經查詢到了發送的測試訊息。

    image

其他動作