全部產品
Search
文件中心

ApsaraMQ for Kafka:訊息檢索

更新時間:Dec 27, 2024

雲訊息佇列 Kafka 版控制台提供的按位點查詢和按時間查詢訊息的功能無法滿足您搜尋訊息的需求時,您可以使用雲訊息佇列 Kafka 版訊息檢索功能。訊息檢索支援按Topic分區、位點範圍、時間範圍以及訊息Key和Value關鍵字檢索。本文介紹如何開通訊息檢索、添加檢索條件以及進行暫停、啟用、刪除管理操作。

前提條件

已為雲訊息佇列 Kafka 版執行個體建立資料來源Topic。更多資訊,請參見步驟一:建立Topic

背景資訊

  • 雲訊息佇列 Kafka 版訊息檢索藉助雲訊息佇列 Kafka 版的Connector功能及Table Store(Tablestore)實現,通過Connector對Topic中的訊息進行轉儲,發送到Table Store中的資料表中,由Table Store索引功能提供訊息檢索的能力。

    開通訊息檢索相當於自動化建立了一個雲訊息佇列 Kafka 版同步資料至Table Store的Connector任務,名稱格式為:ots-ms-{Topic名稱}-{6位隨機字元},該Connector任務在消息检索頁面顯示和管理,而不在Connector 任务列表頁面顯示和管理。

  • 首次開通訊息檢索後,雲訊息佇列 Kafka 版自動為您開通Table Store服務,並建立Table Store執行個體和對應的資料表。每個開通訊息檢索的Topic會在Table Store中對應建立一張資料表。自動建立的執行個體和資料表名稱格式如下:
    • 執行個體名稱:kfk-{雲訊息佇列 Kafka 版執行個體名稱後12位字元}
    • 資料表表名:{Topic名稱}:kafka_topic_{Topic名稱}_{6位隨機字元}
  • 每個開通訊息檢索的Topic會在雲訊息佇列 Kafka 版執行個體中自動建立4個Topic和2個Group,用於記錄任務配置和任務狀態。名稱格式如下:
    • 任務位點Topic:connect-offset-{任務名稱}
    • 任務配置Topic:connect-config-{任務名稱}
    • 任務狀態Topic:connect-status-{任務名稱}
    • 無效信件佇列Topic/異常資料Topic:connect-error-{任務名稱}
    • Connector消費組:connect-{任務名稱}、connect-cluster-{任務名稱}

    更多資訊,請參見建立Tablestore Sink Connector

計費說明

雲訊息佇列 Kafka 版訊息檢索處於公測階段,且獨立於雲訊息佇列 Kafka 版執行個體,因此不會在雲訊息佇列 Kafka 版側產生費用,自動建立的Table Store執行個體和資料表在公測期也不產生費用。同時,阿里雲不承諾訊息檢索的SLA,使用訊息檢索所依賴的其他產品的SLA和費用說明請以對應產品為準。

注意事項

  • 首次開通訊息檢索時,僅會自動開通同地區下的Table Store服務。當前僅支援華東1(杭州)和華南1(深圳)地區開通訊息檢索。
  • 首次開通訊息檢索時,雲訊息佇列 Kafka 版會為您自動建立服務關聯角色AliyunServiceRoleForAlikafkaConnector,以便使用Connector功能。如果已建立服務關聯角色,雲訊息佇列 Kafka 版不會重複建立。關於服務關聯角色的更多資訊,請參見服務關聯角色
  • 單個執行個體預設最多同時支援3個Topic的訊息檢索。每個Topic的訊息檢索結果最多顯示10條。
  • 檢索到的每條訊息在雲訊息佇列 Kafka 版控制台上最多顯示1 KB的內容,超過1 KB的部分將自動截斷。如需查看完整的訊息內容,請下載相應的訊息。
  • 目前Table Store不支援單個欄位大於2 MB,超過2 MB的訊息不會被同步,因此超過該大小的訊息也無法在雲訊息佇列 Kafka 版控制台的消息检索頁面檢索出來。
  • Table Store中的資料保留時間長度與雲訊息佇列 Kafka 版執行個體訊息保留時間長度具有相同的資料生命週期(TTL),當資料超過訊息保留時間長度時,將會自動清除並移除相關索引。關於雲訊息佇列 Kafka 版訊息保留時間長度相關配置和說明資訊,請參見變更訊息配置

    由於Table Store資料到期策略與雲訊息佇列 Kafka 版並不完全一致,故最終檢索到的資料請以實際擷取的為準。

開通訊息檢索

開通某個執行個體下Topic的訊息檢索功能,以便於您根據需要對其Topic中訊息進行檢索。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,從选择实例的下拉式清單選擇需檢索Topic訊息所屬的執行個體,然後單擊开通消息检索
  5. 开通消息检索面板,填寫開通參數,然後單擊確定
    說明 如果您是首次在當前執行個體下開通訊息檢索功能,單擊开通消息检索後,需在您尚未开通当前实例的 Connector 功能提示對話方塊,單擊確認,然後再在开通消息检索面板填寫參數。
    表 1. 開通參數說明
    參數描述樣本值
    数据源 Topic需要開通訊息檢索的Topic。test
    消费初始位置開始消費的位置。預設取值為最近位点。取值說明如下:
    • 最早位点:從最初位點開始消費。
    • 最近位点:從最新位點開始消費。
    最近位点
    记录时间記錄訊息的時間,即檢索訊息時的時間範圍。預設取值為即时时间。取值說明如下:
    • 原生时间雲訊息佇列 Kafka 版中記錄的訊息建立時間,即發送訊息時,用戶端內建的或是您指定的ProducerRecord中的訊息建立時間。
    • 即时时间:資料同步至Table Store的時間。

      該值僅表示訊息被同步至Table Store的時間,並不是訊息的生產時間,因此搜尋結果中的訊息時間與訊息的生產時間可能會不一致。

    原生时间
    消息检索頁面您可以查看到剛開通搜尋功能的Topic。

測試發送訊息

開通訊息檢索後,您可以向雲訊息佇列 Kafka 版的資料來源Topic發送訊息,調度任務和測試訊息檢索是否建立成功。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到需要測試的目標Topic,根據任務狀態在對應位置操作。
    • 如果任務處於運行中已暫停狀態以外的其他狀態,則在其操作列,單擊测试
    • 如果任務處於運行中已暫停狀態,則在其操作列,選擇更多 > 测试
  5. 快速体验消息收发面板,發送測試訊息。
    1. 消息 Key文字框中輸入訊息的Key值,例如demo。
    2. 消息内容文字框輸入測試的訊息內容,例如 {"key": "test"}。
    3. 設定发送到指定分区,選擇是否指定分區。
      • 單擊,在分区 ID文字框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態
      • 單擊,不指定分區。

搜尋訊息

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到目標Topic,在其操作列,單擊搜索
  5. 搜索面板,設定搜尋條件,在搜尋項下拉式清單中選擇需要添加的搜尋項,單擊添加搜尋項,添加搜尋項並在列設定搜尋資訊,然後單擊確定
    搜尋項說明
    分区本次搜尋Topic訊息內容所在的分區。
    位点范围本次搜尋Topic訊息內容的位點範圍。
    Key本次搜尋Topic訊息內容的訊息Key或訊息內容,即要匹配的Key或Value。

    檢索關鍵詞說明如下:

    • 搜尋方式為短語匹配搜尋。例如,訊息Key是“訊息佇列Kafka版是阿里雲提供的分布式、高吞吐、可擴充的訊息佇列服務。”,可以設定搜尋關鍵詞為“分布式”,或者“阿里雲”和“分布式”組合。
    • 如果搜尋索引鍵中包含星號(*)或問號(?),則採用萬用字元檢索。星號(*)代表任一字元序列,問號(?)代表任意單個字元。英文字母不區分大小寫。例如,訊息內容是“AliKafkaTest001qaz”,可以設定關鍵詞為“AliKafkaTe*”。

      不建議使用萬用字元作為起始關鍵字時,檢索效率較低,且帶有萬用字元的字串長度不能超過20個字元。

    關於更多分詞、短語匹配檢索和萬用字元檢索的使用和限制等詳細內容,請參見分詞短語匹配查詢萬用字元查詢

    Value
    时间范围搜尋Topic訊息的時間範圍。您可以設定近三天內的某個時間範圍和自訂時間。 此處設定的搜尋時間最小粒度為分鐘。
    操作單擊刪除:移除該搜尋條件。

    如果需要移除全部條件,您可以單擊搜尋條件列表上方的移除所有条件

    說明 設定多個搜尋條件時,取交集搜尋。
    Topic搜尋頁面,展示檢索的訊息。
    表 2. 檢索結果參數解釋
    參數描述
    分区訊息的Topic分區。
    位点訊息的所在的位點。
    Key訊息的鍵(已強制轉化為String類型)。
    Value訊息的值(已強制轉化為String類型),即訊息的具體內容。
    消息创建时间訊息建立時間或訊息同步至Table Store的時間。

    訊息建立時間指發送訊息時,用戶端內建的或是您指定的ProducerRecord中的訊息建立時間。

    說明
    • 如果配置了該欄位,則按配置值顯示。
    • 如果未配置該欄位,則預設取訊息發送時的系統時間。
    • 如果顯示值為1970/x/x x:x:x,則說明發送時間配置為0或其他有誤的值。
    操作
    • 單擊下载 Key:下載訊息的索引值。
    • 單擊下载 Value:下載訊息的具體內容。
    重要
    • 查詢到的每條訊息在控制台上最多顯示1 KB的內容,超過1 KB的部分將自動截斷。如需查看完整的訊息內容,請下載相應的訊息。
    • 僅專業版支援下載訊息。
    • 下載的訊息最大為10 MB。如果訊息超過10 MB,則只下載10 MB的內容。

查看訊息檢索任務詳情

開通訊息檢索後,即可查看自動建立的Topic、Group、Table Store執行個體名稱、Table Store資料表表名等詳細資料,您也可以在詳情中直接進入Table Store資料表。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到目標Topic,在其操作列,單擊详情
    在任務詳情頁面您可以查看到目標Topic相關訊息檢索的詳細資料。您也可以在基础信息地區目标服务後單擊Table Store,即可進入資料表詳情頁面查看。

查看消費詳情

您可以查看訂閱當前Topic的線上Group在Topic各個分區的消費進度,瞭解訊息的消費和堆積情況。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到需要查看消費進度的目標Topic,在其操作列,單擊消费进度
    在消費詳情頁面,您可以查看Topic各分區的消費情況。

暫停訊息檢索任務

如果您需要暫時中止某個運行中的Topic訊息檢索,您可以暫停該Topic檢索任務。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到目標Topic,在其操作列,選擇更多 > 暂停
  5. 在彈出的對話方塊單擊確認

啟用訊息檢索任務

您可以根據需要重新啟用某個已經暫停訊息檢索任務。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到目標Topic,在其操作列,選擇更多 > 启用
  5. 在彈出的對話方塊單擊確認

刪除訊息檢索任務

刪除Topic的訊息檢索任務後,Table Store的資料表與多元索引會同步刪除,該Topic不再提供訊息檢索功能。如果需要繼續使用該Topic訊息檢索,請重新建立並等待資料同步。

  1. 登入雲訊息佇列 Kafka 版控制台

  2. 概览頁面的资源分布地區,選擇地區。

  3. 在左側導覽列,單擊消息检索
  4. 消息检索頁面,找到需要刪除的目標Topic,根據任務狀態在對應位置操作。
    • 如果任務處於運行中已暫停狀態以外的其他狀態,則在其操作列,單擊删除
    • 如果任務處於運行中已暫停狀態,則在其操作列,選擇更多 > 删除
  5. 在彈出的對話方塊單擊確認
    消息检索頁面,您已看不到剛才已刪除的Topic。