本文介紹如何將Kafka資料匯入到Log Service,實現資料的查詢分析、加工等操作。
前提條件
已有可用的Kafka叢集。
已建立Project和LogStore。具體操作,請參見管理Project和建立基礎LogStore。
版本說明
目前,只支援Kafka 2.0.0及以上版本。
建立資料匯入配置
在接入資料地區的資料匯入頁簽中,單擊Kafka-資料匯入。
選擇目標Project和LogStore,單擊下一步。
設定匯入配置。
在匯入設定步驟中,配置如下參數。
參數
說明
任務名稱
SLS任務的唯一名稱。
顯示名稱
任務顯示名稱。
任務描述
匯入任務的描述。
服務地址
Kafka bootstrap Servers地址。多個服務地址之間使用半形逗號(,)分隔。
如果是阿里雲ApsaraMQ for Kafka,需輸入存取點的IP地址或網域名稱。
如果是阿里雲ECS上自建的Kafka叢集,需輸入ECS執行個體的IP地址。
如果是其他的Kafka叢集,需輸入Kafka Broker的公網IP地址或網域名稱。
Topic列表
Kafka主題。多個主題之間使用半形逗號(,)分隔。
消費組
如果您使用的是阿里雲ApsaraMQ for Kafka,且未開啟自由使用Group功能,則需要選擇對應的消費組。建立消費組的具體操作,請參見建立消費組。
起始位置
開始匯入資料的位置。
最早:從現有的第一條Kafka資料開始匯入。
最晚:從最新產生的Kafka資料開始匯入。
資料格式
待匯入資料的格式。
極簡模式:如果待匯入的資料為單行格式,您可以選擇極簡模式。
JSON字串:如果待匯入的資料為JSON,您可以選擇JSON字串。匯入任務會將資料解析為索引值對格式,只解析到第一層。
解析數組元素
開啟解析數組元素開關後,對於JSON數組格式的資料,系統會按其數組元素拆分為多條資料後進行匯入。
編碼格式
待匯入資料的編碼格式(即字元集),目前支援UTF-8和GBK。
VPC執行個體ID
如果Kafka叢集是VPC環境下的阿里雲ApsaraMQ for Kafka或阿里雲ECS上自建的Kafka叢集,您可以通過設定VPC執行個體ID,實現Log Service通過阿里雲內網讀取Kafka叢集的資料。
通過阿里雲內網讀取資料,具備更好的安全性和網路穩定性。
重要Kafka叢集需允許被IP網段100.104.0.0/16訪問。
時間配置
時間欄位
設定為Kafka資料中代表時間的列名,用於指定資料匯入Log Service時的時間。
提取時間正則
如果您選擇的資料格式為極簡模式,您需要設定Regex提取Kafka資料中的時間。
例如,資料內容為
message with time 2022-08-08 14:20:20,則您可以設定提取時間正則為\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d。時間欄位格式
指定時間格式,用於解析時間欄位的值。
支援Java SimpleDateFormat文法的時間格式,例如yyyy-MM-dd HH:mm:ss。時間格式的文法詳情請參見Class SimpleDateFormat。常見的時間格式請參見時間格式。
支援epoch格式,可選值為epoch、epochMillis、epochMacro或epochNano。
時間欄位時區
選擇時間欄位對應的時區。
當時間格式是epoch時,不需要設定時區。
預設時間來源
當沒有提供時間提取資訊或者時間提取失敗時,使用您所設定的時間來源,包括系統目前時間和kafka訊息時間戳記。
進階配置
日誌上下文
開啟日誌上下文開關後,支援Log Service的上下文查詢功能。您可以查看目標資料在原始Kafka partition中的前若干條(上文)或後若干條(下文)資料。
通訊協定
通過公網匯入時,建議通過加密串連、使用者認證的方式進行匯入,即在此處定義串連Kafka叢集的通訊協定資訊,配置樣本如下所示。
protocol欄位的可選值為plaintext、ssl、sasl_plaintext或sasl_ssl。建議設定為sasl_ssl,此協議需要串連加密和使用者認證。
設定protocol為sasl_plaintext或sasl_ssl時,需設定sasl節點。其中,mechanism欄位可以為PLAIN、SCRAM-SHA-256或SCRAM-SHA-512,表示使用者名稱/密碼身分識別驗證機制。
{ "protocol":"sasl_plaintext", "sasl":{ "mechanism":"PLAIN", "username":"xxx", "password":"yyy" } }私網網域名稱解析
部署在阿里雲ECS上的Kafka Broker之間採用內部網域名稱通訊時,您需要在此處指定每個Broker對應的ECS網域名稱和IP地址。配置樣本如下所示。
{ "hostname#1":"192.168.XX.XX", "hostname#2":"192.168.XX.XX", "hostname#3":"192.168.XX.XX" }單擊預覽,預覽匯入結果。
確認無誤後,單擊下一步。
建立索引和預覽資料,然後單擊下一步。Log Service預設開啟全文索引。您也可以根據採集到的日誌,手動建立欄位索引,或者單擊自動產生索引,Log Service將自動產生欄位索引。更多資訊,請參見建立索引。
重要如果需要查詢日誌中的所有欄位,建議使用全文索引。如果只需查詢部分欄位、建議使用欄位索引,減少索引流量。如果需要對欄位進行分析(SELECT語句),必須建立欄位索引。
單擊查詢日誌,進入查詢和分析頁面,確認是否成功匯入Kafka資料。
等待1分鐘左右,如果有目標Kafka資料匯入,則說明匯入成功。
查看匯入配置
建立匯入配置成功後,您可以在控制台中查看已建立的匯入配置及產生的統計報表。
單擊目標Project。
選擇目標日誌庫下的,單擊配置名稱。
在匯入配置概覽頁面,查看匯入配置的基本資料和統計報表。
相關操作
在匯入配置概覽頁面,您還可以進行如下操作。
修改配置
單擊修改配置,修改匯入配置的相關配置。具體配置,請參見建立資料匯入配置。
刪除配置
單擊刪除配置,刪除該匯入配置。
警告刪除後不可恢複,請謹慎操作。
停止任務
單擊停止,停止該匯入任務。
常見問題
問題 | 可能原因 | 解決方案 |
預覽時出現Kafka Broker串連錯誤(Broker transport failure)。 |
|
|
預覽時出現逾時錯誤(preview request timed out)。 | 待匯入的Kafka Topic中沒有資料。 | 如果待匯入的Kafka Topic中沒有資料,請在寫入資料後,再重試預覽。 |
資料存在亂碼。 | 編碼格式配置不符合預期。 | 根據Kafka真實的編碼格式更新匯入配置。 如果需要修複已有的亂碼資料,請建立新的LogStore和匯入配置。 |
Log Service中顯示的資料時間和資料本身的時間不一致。 | 設定匯入配置時,沒有指定日誌時間欄位或者設定時間格式、時區有誤。 | 設定指定的日誌時間欄位以及正確的時間格式和時區。更多資訊,請參見建立資料匯入配置。 |
匯入資料後,無法查詢和分析資料。 |
| |
匯入的資料條目數量少於預期。 | 存在大於3 MB的Kafka資料,您可以通過資料處理流量觀測儀錶盤確認。 | 縮小單條Kafka訊息的大小。 |
資料匯入時存在明顯的延遲 |
|
|
錯誤處理機制
限制項 | 說明 |
網路連接錯誤 | 匯入任務會定期重試,即網路連接恢複後,匯入任務會自動從之前中斷的Offset位置繼續消費資料。 |
Kafka Topic不存在 | 當目標Kafka Topic不存在時,匯入任務會跳過該Topic,且不影響其他正常的Topic的資料匯入。 當不存在的Topic被重建後,匯入任務會正常消費該Topic中的資料(存在約10分鐘的延遲)。 |