在將Kafka資料匯入到Table Store的過程中可能產生錯誤,如果您不希望導致Sink Task立即失敗,您可以配置錯誤處理策略。本文介紹Kafka Connect Error和Tablestore Sink Task Error兩種錯誤類型的處理方法。
Kafka Connect Error
此類錯誤發生在Sink Task執行資料匯入前,例如使用Converter進行還原序列化或者使用Kafka Transformations對訊息記錄進行輕量級修改時產生錯誤,您可以配置由Kafka提供的錯誤處理選項。
如果您想要跳過此類錯誤,請在Kafka Connect設定檔中配置屬性errors.tolerance=all。
Tablestore Sink Task Error
此類錯誤發生在Sink Task執行資料匯入時,例如解析訊息記錄或者寫入Tablestore時產生錯誤,您可以配置由Tablestore Sink Connector提供的錯誤處理選項。
如果您想要跳過此類錯誤,請在connector設定檔中配置屬性errors.tolerance=all。更多資訊,請參見配置說明。同時,您還可以選擇錯誤報表的方式,如果需要將產生錯誤的訊息記錄儲存到Tablestore中獨立的一張資料表中,請進行如下配置:
runtime.error.tolerance=all
runtime.error.mode=tablestore
runtime.error.table.name=error在distributed模式下,您也可以通過REST API來管理connector和task。如果發生錯誤導致connector或者task停止,您可以選擇手動重啟connector或者task。
檢查connector和task狀態。
查看connector狀態。
curl http://localhost:8083/connectors/{name}/status查看task狀態。
curl http://localhost:8083/connectors/{name}/tasks/{taskid}/status
其中
http://localhost:8083/connectors為Kafka REST服務的地址,name必須與設定檔中的name(連接器名稱)相同,taskid包含在connector狀態資訊中。您還可以通過執行以下命令擷取taskid。
curl http://localhost:8083/connectors/{name}/tasks手動重啟connector或者task。
重啟connector。
curl -X POST http://localhost:8083/connectors/{name}/restart重啟task。
curl -X POST http://localhost:8083/connectors/{name}/tasks/{taskId}/restart