全部產品
Search
文件中心

Dataphin:配置Kafka輸入組件

更新時間:May 29, 2025

配置Kafka輸入組件後,可以將kafka資料來源中的資料讀取至巨量資料平台對接的儲存系統內,並進行資料整合和二次加工。本文為您介紹如何配置Kafka輸入組件。

前提條件

在開始執行操作前,請確認您已完成以下操作:

操作步驟

  1. 在Dataphin首頁頂部功能表列,選擇研發 > Data Integration

  2. 在整合頁面頂部功能表列選擇專案(Dev-Prod模式需要選擇環境)。

  3. 在左側導覽列中單擊離線整合,在離線整合列表中單擊需要開發的離線管道,開啟該離線管道的配置頁面。

  4. 單擊頁面右上方的組件庫,開啟組件庫面板。

  5. 組件庫面板左側導覽列中需選擇輸入,在右側的輸入組件列表中找到KAFKA組件,並拖動該組件至畫布。

  6. 單擊KAFKA輸入組件卡片中的image表徵圖,開啟KAFKA輸入配置對話方塊。

  7. KAFKA輸入配置對話方塊,按照下表配置參數。

    參數

    描述

    步驟名稱

    即Kafka輸入組件的名稱。Dataphin自動產生步驟名稱,您也可以根據業務情境修改。命名規則如下:

    • 只能包含中文、字母、底線(_)、數字。

    • 不能超過64個字元。

    資料來源

    在資料來源下拉式清單中,展示當前Dataphin中所有Kafka類型的資料來源,包括您已擁有同步讀許可權的資料來源和沒有同步讀許可權的資料來源。 單擊image表徵圖,可複製當前資料來源名稱。

    • 對於沒有同步讀許可權的資料來源,您可以單擊資料來源後的申請,申請資料來源的同步讀許可權。具體操作,請參見申請資料來源許可權

    • 如果您還沒有Kafka類型的資料來源,單擊建立資料來源,建立資料來源。具體操作,請參見建立Kafka資料來源

    主題

    Kafka的Topic。單擊下拉式清單,選擇需要讀取的Kafka主題名稱。

    鍵類型

    Kafka的Key的類型,決定了初始化Kafka Consumer時的key.deserializer配置。可選值包括BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(資料來源配置了schema.registry時可選擇

    實值型別

    Kafka的Value的類型,決定了初始化Kafka Consumer時的value.deserializer配置。可選值包括BYTEARRAYDOUBLEFLOATINTEGERLONGSHORTSTRINGKAFKA AVRO(資料來源配置了schema.registry時可選擇

    消費群組ID

    初始化Kafka Consumer時的group.id配置。

    為了確保同步時消費位點的正確性,請避免該參數與其他消費進程重複。如果不指定該參數,則每次執行同步自動產生datax_開頭的隨機字串作為group.id。

    起始時間

    讀取起始時間。僅支援以yyyyMMddHHmmss格式的時間字串指定具體時間,是時間範圍的左邊界。需配合調度參數使用,例如調度參數配置為beginDateTime=${20220101000000},則起始時間配置為${beginDateTime}。

    結束時間

    讀取結束時間。僅支援以yyyyMMddHHmmss格式的時間字串指定具體時間,是時間範圍的右邊界。需配合調度參數使用,例如調度參數配置為endDateTime=${20220101000000},則結束時間配置為${endDateTime}。

    同步結束策略

    選擇同步結束策略,有以下兩種策略:

    • 1分鐘讀取不到新資料時:如果消費者1分鐘從Kafka拉取資料返回為空白(一般是已經讀完主題中的全部資料,也可能是網路或者Kafka叢集可用性原因),則立即停止任務,否則持續重試直到再次讀到資料。

    • 到達指定結束位點:如果Data Integration任務讀取到的Kafka記錄業務時間或者位點滿足上面讀取結束位點配置時,則任務結束,否則無限重試讀取Kafka記錄。

    進階配置

    可通過進階配置進行位點重設策略、單次讀取大小、單次讀取時間、讀取逾時等配置。若topic配置了schema registry,需在進階配置中配置keySchema和valueSchema參數。預設為空白。範例格式如下:

    {
     "namespace": "example.avro",
     "type": "record",
     "name": "User",
     "fields": [
         {"name": "name", "type": "string"},
         {"name": "favorite_number",  "type": ["int", "null"]},
         {"name": "favorite_color", "type": ["string", "null"]}
     ]
    }

    輸出欄位

    預設展示__key__ __value____partition____headers____offset____timestamp__6個欄位。支援手動添加輸出欄位:

    • 單擊大量新增 ,支援以JSONTEXT格式大量設定。

      • JSON格式樣本

        [
            {
                "index": 0,
                "name": "__key__",
                "type": "STRING"
            },
            {
                "index": 1,
                "name": "__value__",
                "type": "STRING"
            },
            {
                "index": 2,
                "name": "__partition__",
                "type": "INTEGER"
            },
            {
                "index": 3,
                "name": "__headers__",
                "type": "STRING"
            },
            {
                "index": 4,
                "name": "__offset__",
                "type": "LONG"
            },
            {
                "index": 5,
                "name": "__timestamp__",
                "type": "LONG"
            }
        ]
        說明

        index表示指定對象的列編號,name表示引入後的欄位名稱,type表示引入後的欄位類型。 例如"index":3,"name":"user_id","type":"String" 表示把檔案中的第4列引入,欄位名為user_id,欄位類型為 String

      • TEXT格式樣本

        0,__key__,STRING
        1,__value__,STRING
        2,__partition__,INTEGER
        3,__headers__,STRING
        4,__offset__,LONG
        5,__timestamp__,LONG
        說明
        • 行分隔字元用於分隔每個欄位的資訊,預設為分行符號(\n),可支援分行符號(\n)、分號(;)、半形句號(.)。

        • 資料行分隔符號用於分隔欄位名與欄位類型,預設半形逗號(,)。

    • 單擊建立輸出欄位,根據頁面提示填寫來源序號欄位及選擇類型

    源頭表欄位也可配置為上述6個字串之外的字串,此時將Kafka記錄作為JSON字串進行解析,將源頭表欄位配置的字串作為JSON路徑讀取對應內容作為欄位值寫入對應的目標表欄位。例如:

    { "data": { "name": "bob", "age": 35 } }為Kafka記錄的value值,當源頭表欄位配置為data.name時,將會讀取bob作為這個欄位的值並寫入對應目標表,支援添加的欄位類型為Java類型和datax的映射類型。

    同時可以對已添加的欄位執行如下操作:

    • 單擊操作列下的agag表徵圖,編輯已有的欄位。

    • 單擊操作列下的agfag表徵圖,刪除已有的欄位。

  8. 單擊確定,完成Kafka輸入組件配置。