全部產品
Search
文件中心

Simple Log Service:匯入Kafka資料

更新時間:Jan 06, 2026

本文介紹如何將Kafka資料匯入到Log Service,實現資料的查詢分析、加工等操作。

前提條件

版本說明

目前,只支援Kafka 2.0.0及以上版本。

建立資料匯入配置

  1. 登入Log Service控制台

  2. 接入資料地區的資料匯入頁簽中,單擊Kafka-資料匯入

  3. 選擇目標Project和LogStore,單擊下一步

  4. 設定匯入配置。

    1. 匯入設定步驟中,配置如下參數。

      參數

      說明

      任務名稱

      SLS任務的唯一名稱。

      顯示名稱

      任務顯示名稱。

      任務描述

      匯入任務的描述。

      服務地址

      Kafka bootstrap Servers地址。多個服務地址之間使用半形逗號(,)分隔。

      • 如果是阿里雲ApsaraMQ for Kafka,需輸入存取點的IP地址或網域名稱。

      • 如果是阿里雲ECS上自建的Kafka叢集,需輸入ECS執行個體的IP地址。

      • 如果是其他的Kafka叢集,需輸入Kafka Broker的公網IP地址或網域名稱。

      Topic列表

      Kafka主題。多個主題之間使用半形逗號(,)分隔。

      消費組

      如果您使用的是阿里雲ApsaraMQ for Kafka,且未開啟自由使用Group功能,則需要選擇對應的消費組。建立消費組的具體操作,請參見建立消費組

      起始位置

      開始匯入資料的位置。

      • 最早:從現有的第一條Kafka資料開始匯入。

      • 最晚:從最新產生的Kafka資料開始匯入。

      資料格式

      待匯入資料的格式。

      • 極簡模式:如果待匯入的資料為單行格式,您可以選擇極簡模式

      • JSON字串:如果待匯入的資料為JSON,您可以選擇JSON字串。匯入任務會將資料解析為索引值對格式,只解析到第一層。

      解析數組元素

      開啟解析數組元素開關後,對於JSON數組格式的資料,系統會按其數組元素拆分為多條資料後進行匯入。

      編碼格式

      待匯入資料的編碼格式(即字元集),目前支援UTF-8和GBK。

      VPC執行個體ID

      如果Kafka叢集是VPC環境下的阿里雲ApsaraMQ for Kafka或阿里雲ECS上自建的Kafka叢集,您可以通過設定VPC執行個體ID,實現Log Service通過阿里雲內網讀取Kafka叢集的資料。

      通過阿里雲內網讀取資料,具備更好的安全性和網路穩定性。

      重要

      Kafka叢集需允許被IP網段100.104.0.0/16訪問。

      時間配置

      時間欄位

      設定為Kafka資料中代表時間的列名,用於指定資料匯入Log Service時的時間。

      提取時間正則

      如果您選擇的資料格式極簡模式,您需要設定Regex提取Kafka資料中的時間。

      例如,資料內容為message with time 2022-08-08 14:20:20,則您可以設定提取時間正則\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d

      時間欄位格式

      指定時間格式,用於解析時間欄位的值。

      • 支援Java SimpleDateFormat文法的時間格式,例如yyyy-MM-dd HH:mm:ss。時間格式的文法詳情請參見Class SimpleDateFormat。常見的時間格式請參見時間格式

      • 支援epoch格式,可選值為epoch、epochMillis、epochMacro或epochNano。

      時間欄位時區

      選擇時間欄位對應的時區。

      當時間格式是epoch時,不需要設定時區。

      預設時間來源

      當沒有提供時間提取資訊或者時間提取失敗時,使用您所設定的時間來源,包括系統目前時間和kafka訊息時間戳記。

      進階配置

      日誌上下文

      開啟日誌上下文開關後,支援Log Service的上下文查詢功能。您可以查看目標資料在原始Kafka partition中的前若干條(上文)或後若干條(下文)資料。

      通訊協定

      通過公網匯入時,建議通過加密串連、使用者認證的方式進行匯入,即在此處定義串連Kafka叢集的通訊協定資訊,配置樣本如下所示。

      protocol欄位的可選值為plaintext、ssl、sasl_plaintext或sasl_ssl。建議設定為sasl_ssl,此協議需要串連加密和使用者認證。

      設定protocol為sasl_plaintext或sasl_ssl時,需設定sasl節點。其中,mechanism欄位可以為PLAIN、SCRAM-SHA-256或SCRAM-SHA-512,表示使用者名稱/密碼身分識別驗證機制。

      {
          "protocol":"sasl_plaintext",
           "sasl":{
              "mechanism":"PLAIN",
              "username":"xxx",
              "password":"yyy"
          }
      }

      私網網域名稱解析

      部署在阿里雲ECS上的Kafka Broker之間採用內部網域名稱通訊時,您需要在此處指定每個Broker對應的ECS網域名稱和IP地址。配置樣本如下所示。

      {
      "hostname#1":"192.168.XX.XX",
      "hostname#2":"192.168.XX.XX",
      "hostname#3":"192.168.XX.XX"
      }
    2. 單擊預覽,預覽匯入結果。

    3. 確認無誤後,單擊下一步

  5. 建立索引和預覽資料,然後單擊下一步。Log Service預設開啟全文索引。您也可以根據採集到的日誌,手動建立欄位索引,或者單擊自動產生索引,Log Service將自動產生欄位索引。更多資訊,請參見建立索引

    重要

    如果需要查詢日誌中的所有欄位,建議使用全文索引。如果只需查詢部分欄位、建議使用欄位索引,減少索引流量。如果需要對欄位進行分析(SELECT語句),必須建立欄位索引。

  6. 單擊查詢日誌,進入查詢和分析頁面,確認是否成功匯入Kafka資料。

    等待1分鐘左右,如果有目標Kafka資料匯入,則說明匯入成功。

查看匯入配置

建立匯入配置成功後,您可以在控制台中查看已建立的匯入配置及產生的統計報表。

  1. 單擊目標Project。

  2. 選擇目標日誌庫下的資料接入 > 資料匯入,單擊配置名稱。

  3. 匯入配置概覽頁面,查看匯入配置的基本資料和統計報表。

相關操作

匯入配置概覽頁面,您還可以進行如下操作。

  • 修改配置

    單擊修改配置,修改匯入配置的相關配置。具體配置,請參見建立資料匯入配置

  • 刪除配置

    單擊刪除配置,刪除該匯入配置。

    警告

    刪除後不可恢複,請謹慎操作。

  • 停止任務

    單擊停止,停止該匯入任務。

常見問題

問題

可能原因

解決方案

預覽時出現Kafka Broker串連錯誤(Broker transport failure)。

  • 設定了錯誤的Kafka服務地址。

  • 沒有在白名單中添加匯入服務對應的IP地址,導致匯入服務無法訪問Kafka叢集。

  • 匯入部署在阿里雲上的Kafka叢集資料時,沒有設定VPC執行個體ID。

  • 確保設定了正確的Kafka服務地址。

  • 在白名單中添加IP地址,允許匯入服務串連Kafka叢集。更多資訊,請參見IP地址白名單

  • 採用阿里雲內網匯入部署在阿里雲上的Kafka叢集資料時,確保設定了VPC執行個體ID。

預覽時出現逾時錯誤(preview request timed out)。

待匯入的Kafka Topic中沒有資料。

如果待匯入的Kafka Topic中沒有資料,請在寫入資料後,再重試預覽。

資料存在亂碼。

編碼格式配置不符合預期。

根據Kafka真實的編碼格式更新匯入配置。

如果需要修複已有的亂碼資料,請建立新的LogStore和匯入配置。

Log Service中顯示的資料時間和資料本身的時間不一致。

設定匯入配置時,沒有指定日誌時間欄位或者設定時間格式、時區有誤。

設定指定的日誌時間欄位以及正確的時間格式和時區。更多資訊,請參見建立資料匯入配置

匯入資料後,無法查詢和分析資料。

  • 資料不在查詢範圍內。

  • 未配置索引。

  • 索引未生效。

  • 檢查待查詢資料的時間是否在查詢時間範圍內。

    如果不在查詢範圍內,請調整查詢範圍並重新查詢。

  • 檢查是否已為LogStore設定索引。

    如果未設定,請先設定索引。具操操作,請參見建立索引重建索引

  • 如果已設定索引,且資料處理流量觀測儀錶盤中顯示的成功匯入資料量符合預期,則可能原因是索引未生效,請嘗試重建索引。具體操作,請參見重建索引

匯入的資料條目數量少於預期。

存在大於3 MB的Kafka資料,您可以通過資料處理流量觀測儀錶盤確認。

縮小單條Kafka訊息的大小。

資料匯入時存在明顯的延遲

  • Kafka叢集頻寬達到限制。

  • 通過公網匯入資料時,網路不穩定。

  • Kafka Topic的Partition數量過少。

  • LogStore Shard數量過少。

  • 更多原因,請參見效能限制

  • 檢查Kafka叢集流量是否達到頻寬節流設定(特別是部署在阿里雲上的Kafka叢集),如果達到或接近頻寬節流設定,則需要進行頻寬擴容。

  • Kafka Topic的Partition數量較少時,請嘗試增加Partition數量,並觀察延遲情況。

  • LogStore Shard數量較少時,請嘗試增加Shard的個數,並觀察延遲情況。具體操作,請參見管理Shard

錯誤處理機制

限制項

說明

網路連接錯誤

匯入任務會定期重試,即網路連接恢複後,匯入任務會自動從之前中斷的Offset位置繼續消費資料。

Kafka Topic不存在

當目標Kafka Topic不存在時,匯入任務會跳過該Topic,且不影響其他正常的Topic的資料匯入。

當不存在的Topic被重建後,匯入任務會正常消費該Topic中的資料(存在約10分鐘的延遲)。