單表即時同步支援將Kafka作為源端輸入,即時捕獲訊息佇列中的資料寫入目標端。本文介紹Kafka輸入組件的配置方式。
適用範圍
支援阿里雲Kafka,以及>=0.10.2且<=2.2.x的自建Kafka版本。
對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,且Kafka資料結構可能不支援時間戳記,因此會引發同步任務延時統計錯亂,造成無法正確重設同步位點。
kafka資料來源配置詳情請參考:配置Kafka資料來源。
操作步驟
進入資料開發頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入資料開發。
滑鼠移至上方至
表徵圖,單擊。 您也可以展開商務程序,按右鍵目標商務程序,選擇。
在建立節點對話方塊中,選擇同步方式為單表(Topic)到單表(Topic)ETL,輸入名稱,並選擇路徑。
單擊確認。
在即時同步節點的編輯頁面,按一下滑鼠並拖拽至編輯面板。
單擊Kafka節點,在節點配置對話方塊中,配置各項參數。

參數
描述
資料來源
選擇已經配置好的Kafka資料來源,此處僅支援Kafka資料來源。如果未配置資料來源,請單擊右側的建立資料來源,跳轉至 頁面進行建立。詳情請參見:配置Kafka資料來源。
主題
Kafka的Topic名稱,是Kafka處理資源的訊息源的不同分類。
每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為Topic,一個Topic是對一組訊息的歸納。
說明一個Kafka輸入僅支援一個Topic。
鍵類型
Kafka的Key的類型,決定了初始化KafkaConsumer時的key.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
實值型別
Kafka的Value的類型,決定了初始化KafkaConsumer時的value.deserializer配置,可選值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
輸出模式
定義解析kafka記錄的方式
單行輸出:以無結構字串或者JSON對象解析Kafka記錄,一個Kafka記錄解析出一個輸出記錄。
多行輸出:以JSON數組解析Kafka記錄,一個JSON數組元素解析出一個輸出記錄,因此一個Kafka記錄可能解析出多個輸出記錄。
說明目前只在部分地區支援該配置項,如發現無該配置項請耐心等待功能在對應地區發布。
數組所在位置路徑
當輸出模式設定為多行輸出時,指定JSON數組在Kafka記錄value中的路徑,路徑支援以
a.a1的格式引用特定JSON對象中的欄位或者以a[0].a1的格式引用特定JSON數組中的欄位,如果該配置項為空白,則將整個Kafka記錄value作為一個JSON數組解析。注意解析的目標JSON數組必須是對象數組,例如
[{"a":"hello"},{"b":"world"}],不能是數值或字串數組,例如["a","b"]。配置參數
在建立Kafka資料消費用戶端(KafkaConsumer)時,您可以通過配置
kafkaConfig擴充參數來精細化控制其讀取資料的行為。各版本Kafka叢集支援的完整參數列表,請參考對應版本的Kafka官方文檔。常用參數樣本:
bootstrap.serversauto.commit.interval.mssession.timeout.ms
group.id預設行為
預設情況下,即時同步任務會為KafkaConsumer設定一個隨機產生的字串作為group.id。手動設定
您可以手動指定一個固定的group.id,方便在Kafka叢集中通過指定的消費者組,來監控或觀察同步任務的消費位點(Offset)。
輸出欄位
自訂Kafka資料對外輸出的欄位名:
單擊添加更多欄位,輸入欄位名,並選擇類型,即可新增自訂欄位。
取值方式支援從Kafka記錄中取得欄位值的方式,單擊右側
按鈕可以在兩類取值方式間切換。預置取值方式:提供6種可選預置從Kafka記錄中取值的方式:
value:訊息體
key:訊息鍵
partition:分區號
offset:位移量
timestamp:訊息的毫秒時間戳記
headers:訊息頭
JSON解析取值:可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容,同時為了相容歷史邏輯,支援在選擇JSON解析取值時使用例如__value__這樣以兩個底線開頭的字串擷取kafka記錄的特定內容作為欄位值。Kafka的資料樣本如下。
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }不同情況下,輸出欄位的取值為:
如果同步Kafka記錄value,取值方式填寫__value__。
如果同步Kafka記錄key,取值方式填寫__key__。
如果同步Kafka記錄partition,取值方式填寫__partition__。
如果同步Kafka記錄offset,取值方式填寫__offset__。
如果同步Kafka記錄timestamp,取值方式填寫__timestamp__。
如果同步Kafka記錄headers,取值方式填寫__headers__。
如果同步a1的資料"hello",取值方式填寫a.a1。
如果同步b的資料"world,取值方式填寫b。
如果同步c的資料"yyyyyyy",取值方式填寫c[1]。
如果同步AA的資料"this",取值方式填寫d[0].AA。
滑鼠移至上方至相應欄位,單擊顯示的
表徵圖,即可刪除該欄位。
情境樣本:在輸出模式選擇多行輸出情況下,將先根據數組所在位置路徑指定的JSON路徑解析出JSON數組,然後取出JSON數組中的每一個JSON對象,再根據定義的欄位名和取值方式解析組成輸出欄位,取值方式的定義與單行輸出模式一樣,可以通過.(擷取子欄位)和[](擷取數組元素)兩種文法,擷取複雜JSON格式的內容。Kafka執行個體資料如下:
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }當數組所在位置填寫
c.c0,輸出欄位定義兩個欄位,一個欄位名為AA,取值方式為AA,一個欄位名為BB,取值方式為BB,那麼該條Kafka記錄將解析得到如下兩條記錄:
單擊工具列中的
表徵圖。說明一個Kafka輸入僅支援一個Topic。