全部產品
Search
文件中心

E-MapReduce:同步資料

更新時間:Jun 17, 2025

本教程以MySQL資料來源中的使用者基本資料ods_user_info_d表和HttpFile中的網站訪問日誌資料user_log.txt檔案為例,通過Data Integration離線同步任務分別同步至私人OSS中,再通過Spark SQL建立外部表格來訪問私人OSS資料存放區。本章節旨在完成資料同步操作。

章節目標

  1. 本章節通過Data Integration將平台提供的MySQL資料來源內的使用者基本資料資料HttpFile資料來源內的使用者網站訪問日誌資料同步至私人OSSObject Storage Service建立的資料來源中。

    源端資料來源類型

    源端待同步資料

    源端表結構

    目標端資料來源類型

    MySQL

    表:ods_user_info_d

    使用者基本資料資料

    • uid 使用者名稱

    • gender 性別

    • age_range 年齡分段

    • zodiac 星座

    OSS

    HttpFile

    檔案:user_log.txt

    使用者網站訪問日誌資料

    一行為一條使用者訪問記錄。

    $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

    OSS

  2. 完成同步任務後,通過EMR Spark SQL建立外部表格來訪問私人OSS資料存放區。

操作步驟

步驟一:設計商務程序

步驟二:搭建同步鏈路

步驟三:驗證同步資料

步驟一:設計商務程序

本步驟內,將Data Integration節點以及EMR Spark SQL 節點相結合,形成使用者Portrait analysis任務流程中擷取資料部分的流程。主要是通過ods_raw_log_d_2oss_spark節點從HttpFile資料來源擷取日誌資料至私人OSS資料來源中,再通過ods_raw_log_d_spark節點產生簡單的日誌外部表格,從私人OSS資料存放區中擷取使用者日誌資料。以及通過ods_user_info_d_2oss_spark從MySQL資料來源同步處理的使用者基本資料至私人OSS資料來源中,再通過ods_user_info_d_spark實現外部表格的建立,從私人OSS資料存放區中擷取使用者基本資料資料。

登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與營運 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

設計商務程序

  1. 建立商務程序。

    資料開發需基於商務程序使用對應的開發組件進行具體開發操作。在建立節點之前,您需要先建立商務程序。具體操作方法可參見建立商務程序

    該商務程序的命名為:使用者Portrait analysis_Spark

    image

  2. 設計商務程序。

    商務程序建立完成後,將自動延伸該商務程序畫布。請根據商務程序設計,在商務程序畫布中單擊建立節點,通過將節點群組件拖拽至商務程序畫布,並通過拉線設定節點上下遊依賴的方式,設計資料同步階段的商務程序。

    image

  3. 在本教程中,由於虛擬節點和同步節點之間並無血緣關係,因此我們通過商務程序拉線的方式來設定節點的依賴關係。有關更多依賴關係設定方式的詳細資料,詳情請參見調度依賴配置指引。以下為各個節點的類型、命名以及作用的介紹。

    節點分類

    節點類型

    節點命名

    節點作用

    通用

    image虛擬節點

    workshop_start_spark

    用於統籌管理整個使用者Portrait analysis商務程序,例如商務程序起調時間。當商務程序較複雜時,可使資料流轉路徑更清晰。該節點為空白跑任務,無須編輯代碼。

    Data Integration

    image離線同步

    ods_raw_log_d_2oss_spark

    用於將HttpFile資料來源儲存的使用者網站訪問記錄,通過離線同步的方式同步至私人OSS資料來源中,以供後續Spark SQL擷取。

    Data Integration

    image離線同步

    ods_user_info_d_2oss_spark

    用於將MySQL資料來源儲存的使用者基本資料資料,通過離線同步的方式同步至私人OSS資料來源中,以供後續Spark SQL擷取。

    EMR

    imageEMR Spark SQL

    ods_raw_log_d_spark

    在EMR Spark SQL節點中,建立表ods_raw_log_d_spark,並通過該外部表格訪問私人OSS中的使用者網站訪問記錄資料。

    EMR

    imageEMR Spark SQL

    ods_user_info_d_spark

    在EMR Spark SQL節點中,建立表ods_user_info_d_spark,並通過該外部表格訪問私人OSS中的使用者基本資料資料。

配置調度邏輯

本案例通過虛擬節點workshop_start_Spark控制整個商務程序每天00:30調度執行,以下為虛擬節點關鍵調度配置,其他節點調度無須變更,實現邏輯詳情請參見:情境:如何配置商務程序定時時間。其他調度配置相關說明,請參見:任務調度屬性配置概述

調度配置

圖片樣本

說明

調度時間配置

image

虛擬節點配置調度時間為00:30,該虛擬節點會在每日00:30調起當前商務程序並執行。

調度依賴配置

image

由於虛擬節點workshop_start_spark無上遊依賴,此時可以直接依賴工作空間根節點,由空間根節點觸發workshop_start_spark節點執行。

說明

DataWorks中的所有節點都需要依賴於上遊節點,資料同步階段的所有任務都以虛擬節點workshop_start_spark為依賴,通過workshop_start_spark節點來觸發資料同步商務程序的執行。

步驟二:搭建同步鏈路

配置完成商務程序後,分別雙擊ods_user_info_d_2oss_spark以及ods_raw_log_d_2oss_sparkData Integration節點,配置使用者資料同步至私人OSS和配置使用者日誌同步至私人OSS,並且通過ods_raw_log_d_sparkods_user_info_d_spark採用 Spark SQL代碼,實現通過Spark SQL建立的外表來訪問儲存於私人OSS的資料。

使用者資料與日誌同步至OSS資料來源

使用Data Integration將平台提供的使用者資料與使用者日誌同步至私人OSSObject Storage Service的Bucket目錄下。

配置使用者日誌同步至OSS

通過離線Data Integration任務,實現從平台的HttpFile資料來源內的擷取使用者日誌資訊,同步至私人OSS資料來源中。

同步HttpFile資料來源的日誌資訊至自建的OSS。

  1. 資料開發頁面,雙擊ods_raw_log_d_2oss_spark節點,進入節點配置頁面。

  2. 配置同步網路連結。

    完成以下網路與資源配置後,單擊下一步,並根據介面提示完成連通性測試。

    參數

    描述

    資料來源

    • 資料來源:HttpFile。

    • 資料來源名稱:user_behavior_analysis_httpfile。

    我的資源群組

    選擇已購買的Serverless資源群組

    資料去向

    • 資料去向:OSS。

    • 資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為test_g。

  3. 配置同步任務。

    參數

    描述

    資料來源

    • 檔案路徑/user_log.txt

    • 文本類型:選擇text類型。

    • 資料行分隔符號:輸入資料行分隔符號為|

    • 壓縮格式:包括None、Gzip、Bzip2和Zip四種類型,此處選擇None

    • 是否跳過表頭:選擇No。

    資料去向

    • 文本類型:選擇text類型。

    • 檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d為您自建的目錄名,$bizdate表示擷取前一天的日期。

    • 資料行分隔符號:輸入資料行分隔符號為|

  4. 調度設定。

    配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊。以下為配置的內容。

    說明

    DataWorks提供調度參數,可實現在調度情境下,將每日資料寫入不同的OSS路徑及檔案下,並以業務日期對路徑目錄與檔案進行命名。

    在實際情境下,您可以在資料去向的檔案名稱(含目錄)配置中通過${變數名}格式自訂路徑中的變數,並通過在調度配置頁面為變數賦值調度參數的方式,實現調度情境下動態產生資料去向目錄與檔案名稱。

    配置項

    配置內容

    圖示

    調度參數

    調度參數項中單擊新增參數,添加:

    • 參數名:bizdate

    • 參數值:$[yyyymmdd-1]

    詳情可參見:配置調度參數

    image

    調度依賴

    調度依賴確認產出表已作為本節點輸出。

    格式為workspacename.節點名

    詳情可參見:配置調度依賴

    image

  5. 配置完成後,單擊工具列中的儲存表徵圖,進行儲存。

配置使用者資料同步至OSS

通過離線Data Integration任務,實現從平台的MySQL資料來源內的擷取使用者資料資訊,同步至私人OSS資料來源中。

  1. 資料開發頁面,雙擊ods_user_info_d_2oss_spark節點,進入節點配置頁面。

  2. 配置同步網路連結。

    完成以下網路與資源配置後,單擊下一步,並根據介面提示完成連通性測試。

    參數

    描述

    資料來源

    • 資料來源:MySQL。

    • 資料來源名稱:user_behavior_analysis_mysql。

    我的資源群組

    選擇已購買的Serverless資源群組

    資料去向

    • 資料去向:OSS。

    • 資料來源名稱:選擇前文建立的私人OSS資料來源,此處樣本為test_g。

  3. 配置同步任務。

    參數

    描述

    資料來源

    • :選擇資料來源中的ods_user_info_d

    • 切分鍵:建議使用主鍵或有索引的列作為切分鍵,僅支援類型為整型的欄位。此處設定切分鍵為uid

    資料去向

    • 文本類型:選擇text類型。

    • 檔案名稱(含路徑):根據您自建OSS的目錄進行輸入,樣本為ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d為您自建的目錄名,$bizdate表示擷取前一天的日期。

    • 資料行分隔符號:輸入資料行分隔符號為|

  4. 調度設定

    配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點資訊。以下為配置的內容。

    說明

    DataWorks提供調度參數,可實現在調度情境下,將每日資料寫入不同的OSS路徑及檔案下,並以業務日期對路徑目錄與檔案進行命名。

    在實際情境下,您可以在資料去向的檔案名稱(含目錄)配置中通過${變數名}格式自訂路徑中的變數,並通過在調度配置頁面通過為變數賦值調度參數的方式,實現調度情境下動態產生資料去向目錄與檔案名稱。

    配置項

    配置內容

    圖示

    調度參數

    調度參數項中單擊新增參數,添加:

    • 參數名:bizdate

    • 參數值:$[yyyymmdd-1]

    詳情可參見:配置調度參數

    image

    調度依賴

    調度依賴確認產出表已作為本節點輸出。

    格式為workspacename.節點名

    詳情可參見:配置調度依賴

    image

  5. 配置完成後,單擊工具列中的儲存表徵圖。

建立Spark外部表格載入OSS資料

資料通過離線整合任務同步至私人OSS資料來源後,基於產生的OSS檔案,通過Spark SQL的create文法建立ods_raw_log_d_sparkods_user_info_d_spark表,並通過LOCATION來擷取OSS中的使用者資訊檔、使用者日誌資訊以供後續資料加工使用。

配置ods_raw_log_d_spark節點

基於通過EMR Spark SQL建立的外部表格ods_raw_log_d_spark,用LOCATION來訪問離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的日誌資訊。

  1. 代碼配置。

    -- 情境:以下SQL為Spark SQL,通過EMR Spark SQL建立的外部表格ods_raw_log_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的日誌資訊,並添加對應的dt分區。
    -- 補充:
    --      DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。
    --      在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。 
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark
    (
      `col` STRING
    ) 
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
    
    ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'
    ;
    說明

    上述代碼中的location為樣本路徑,與之前配置離線同步任務時的資料去向相同,需要輸入您建立的檔案路徑名稱,其中dw-emr-demo是您準備環境時建立的OSS Bucket網域名稱。

  2. 配置調度配置。

    ods_raw_log_d_spark節點配置任務調度,通過配置的調度參數來擷取對應業務日期的私人OSS記錄檔,並寫入同樣業務日期分區的Spark表內。

    配置項

    配置內容

    圖示

    調度參數

    調度參數項中單擊新增參數,添加:

    參數名:bizdate

    參數值:$[yyyymmdd-1],詳情可參見:配置調度參數

    image

    調度依賴

    調度依賴確認產出表已作為本節點輸出。

    格式為workspacename.節點名

    詳情可參見:配置調度依賴

    image

    說明

    本章節在SQL中配置了調度參數${bizdate},並將其賦值為T-1。在離線計算情境下bizdate為業務交易發生的日期,也常被稱為業務日期。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。

  3. 完成配置後,單擊image儲存節點。

配置ods_user_info_d_spark節點

基於通過EMR Spark SQL節點建立的外部表格ods_user_info_d_spark,用LOCATION來訪問離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的使用者資訊。

  1. 代碼配置。

    -- 情境:以下SQL為Spark SQL,通過EMR Spark SQL節點建立的外部表格ods_user_info_d_spark,用LOCATION來擷取離線Data Integration任務寫入私人OSSObject Storage ServiceBucket的使用者資訊,並寫入對應的dt分區。
    -- 補充:
    --      DataWorks提供調度參數,可實現調度情境下,將每日增量資料寫入目標表對應的業務分區內。
    --      在實際開發情境下,您可通過${變數名}格式定義代碼變數,並在調度配置頁面通過變數賦值調度參數的方式,實現調度情境下代碼動態入參。
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark
    (
        `uid`        STRING COMMENT '使用者ID'
        ,`gender`    STRING COMMENT '性別'
        ,`age_range` STRING COMMENT '年齡段'
        ,`zodiac`    STRING COMMENT '星座'
    )
    PARTITIONED BY 
    (
        dt           STRING
    )
    ROW FORMAT DELIMITED 
    FIELDS
    TERMINATED
    BY'|'
    STORED AS TEXTFILE
    LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    
    ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    說明

    上述代碼中的location為樣本路徑,與之前配置離線同步任務時的資料去向相同,需要輸入您建立的檔案路徑名稱,其中dw-emr-demo是您準備環境時建立的OSS Bucket網域名稱。

  2. 配置調度配置。

    ods_user_info_d_spark節點配置任務調度,通過配置的調度參數來擷取對應業務日期的私人OSS使用者資訊檔,並寫入同樣業務日期分區的Spark表內。

    配置項

    配置內容

    圖示

    調度參數

    調度參數項中單擊新增參數,添加:

    參數名:bizdate

    參數值:$[yyyymmdd-1],詳情可參見:配置調度參數

    image

    調度依賴

    調度依賴確認產出表已作為本節點輸出。

    格式為worksspacename.節點名

    詳情可參見:配置調度依賴

    image

  3. 完成配置後,單擊image儲存節點。

步驟三:驗證同步資料

在確保該章節內的所有節點運行成功的情況下,在左側導覽列的臨時查詢中建立EMR Spark SQL臨時查詢,編寫SQL查看EMR Spark SQL節點建立的外部表格是否正常產出。

-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務啟動並執行日期為20240808,則業務日期為20240807,即任務運行日期的前一天。
SELECT  * FROM  ods_raw_log_d_spark  WHERE dt ='業務日期';--查詢ods_raw_log_d_spark表
SELECT  * FROM  ods_user_info_d_spark   WHERE dt ='業務日期';--查詢ods_user_info_d_spark表
說明

在驗證同步資料的SQL中,可將WHERE條件替換為"dt = ${bizdate}",在臨時查詢任務中單擊image帶參運行,為SQL預留位置${bizdate}賦值後運行即可。

後續步驟

現在,您已經完成了同步資料,您可以繼續下一個教程。在下一個教程中,您將學習將使用者基本資料資料、使用者網站訪問日誌資料在Spark中進行加工處理。詳情請參見加工資料