全部產品
Search
文件中心

DataWorks:最佳實務:資料開發工作流程中配置資料推送節點

更新時間:Mar 06, 2025

DataWorks資料開發支援將資料推送作為節點,結合資料開發已有的工作流程,提供了簡單推送、合并推送、指令碼推送、條件推送以及MaxCompute資料推送方式,本實踐將為您帶來這五種推送方式的實踐。

背景資訊

在DataWorks商務程序開發中,可以新增資料推送節點,該節點可根據商務程序處理完成資料後,通過簡單的查詢來擷取所需要的資料,根據任務調度及時、快速地推送資料至DingTalk群、飛書群、企業微信群、Teams以及郵件中。

實踐思路

  1. 新增一個節點,用來準備測試資料,以便為後續不同推送流程提供資料支援。

  2. 新增資料查詢節點、賦值節點以及其他資料處理節點,實現對測試資料處理與查詢。

    說明

    準備測試資料節點與資料處理查詢節點,在實際業務中可以根據實際情況進行調整,本案例以MySQL節點為例進行資料的推送。

  3. 新增資料推送節點,接收資料查詢節點的輸出參數,將上下文參數中的輸入參數攜帶的資料推送至DingTalk群、飛書群、企業微信群、Teams以及郵件中。

實踐方式介紹

本實踐共有五種資料推送節點與工作流程的結合方式,分別為簡單推送合并推送指令碼推送條件推送以及MaxCompute資料推送方式。

  • 簡單推送:上遊工作流程完成SQL查詢後,通過節點上下文參數將查詢結果輸出至資料推送節點進行推送。

  • 合并推送:上遊有多個工作流程,分別完成SQL查詢後,通過節點上下文參數將查詢結果輸出至一個資料推送節點進行推送。

  • 指令碼推送:上遊節點為賦值節點,通過賦值節點代碼對資料進行處理並通過節點上下文參數將資料輸出至推送節點進行推送。賦值節點詳情,請參見賦值節點

  • 條件推送:上遊流程通過分支節點對資料按照配置的條件進行判斷,查詢輸出符合判斷條件的資料,通過節點上下文參數將查詢結果輸出至資料推送節點進行推送。分支節點詳情,請參見分支節點

  • MaxCompute資料推送:推送MaxCompute資料來源上的資料,可通過賦值節點對MaxCompute進行查詢後進行推送。

前提條件

在開啟本實踐前,您需要首先準備好以下內容:

  • 請確保已開通DataWorks服務,詳情請參見開通DataWorks服務

  • 請確保已建立DataWorks空間,詳情請參見建立工作空間

  • 請確保已通過RDS建立MySQL執行個體,並已在空間下建立MySQL資料來源。詳情請參見相關文檔建立並管理資料來源

  • 請確保在工作空間下已建立MaxCompute資料來源。詳情請參見建立MaxCompute資料來源

    說明

    本文以MySQL資料來源與MaxCompute資料來源為例,您可以根據自身需求新增不同類型的資料來源。

  • 請確保已擁有Serverless資源群組,僅支援Serverless資源群組執行資料推送節點任務,如需購買使用Serverless資源群組,詳情請參見新增和使用Serverless資源群組

使用限制

  • 資料推送功能推送至不同對象時的資料大小限制:

    • 推送目標為DingTalk,推送資料大小不超過20KB

    • 推送目標為飛書,推送資料大小不超過20KB,圖片小於10MB

    • 推送目標為企業微信,每個機器人發送的訊息不能超過20條/分鐘

    • 推送目標為Teams,推送大小不大於28KB

    • 推送目標為郵件,每個資料推送任務僅支援添加一個郵件內文,若已添加郵件內文,則無法再次添加,更多限制可參考所使用郵件服務的SMTP限制。

  • 僅以下地區的DataWorks工作空間可使用資料推送功能:華東1(杭州)、華東2(上海)、華北2(北京)、華南1(深圳)、西南1(成都)、中國香港、新加坡、馬來西亞(吉隆坡)、美國(矽谷)、美國(維吉尼亞)、德國(法蘭克福)

準備流程

建立商務程序

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與營運 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 資料開發面板,按右鍵商務程序,選擇建立商務程序。並根據業務需要定義商務程序名稱,本案例商務程序命名為資料推送Demo樣本

建立流程節點

建立完成資料推送Demo流程商務程序後,雙擊流程名開啟商務程序圖頁面,根據下表裡您需要的推送方式,單擊image 建立對應的節點。為方便完成該實踐中的推送方式,請以表格內的節點名對建立的節點進行命名。

推送方式

節點名

節點類型

節點說明

條件推送

查詢上月銷售總額

MySQL節點

查詢測試資料中上月銷售總額,將查詢結果以上下文參數的形式輸出給分支節點。

達標條件判斷

分支節點

接收MySQL節點傳遞的輸出參數,按條件進行分支判斷,將符合條件與不合格資料以上下文參數的形式輸出給不同MySQL節點。

上月銷售達標資料

MySQL節點

接收分支節點判斷達標的輸出參數,查詢出達標資料,以上下文參數的形式輸出給資料推送節點。

上月銷售不達標資料

接收分支節點判斷不達標的輸出參數,查詢出不達標資料,以上下文參數的形式輸出給資料推送節點。

推送銷售達標前三名類別

資料推送

接收查詢的達標輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

推送銷售不達標倒數三名類別

接收查詢的不達標輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

指令碼推送

查詢上一周銷售資料

MySQL節點

查詢測試資料中上周銷售前三類總額,將查詢結果以上下文參數的形式輸出給賦值節點。

組織銷售前三名列表

賦值節點

樣本賦值語言為Python,接收MySQL節點的輸出參數,並將其列表化後,再將結果以上下文參數的形式輸出給資料推送節點。

推送上周前三名類別

資料推送

接收賦值節點列表的輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

合并推送

查詢昨日銷售總額

MySQL節點

  • 查詢測試資料中昨日銷售總額與昨日銷售增長額,將查詢結果以上下文參數的形式輸出給資料推送節點。

  • 查詢昨日銷售總額節點可與簡單推送中的昨日銷售總額共用同一節點。

查詢昨日銷售增長額

推送昨日銷售總額與增長額

資料推送

同時接收兩個MySQL節點傳遞的輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

簡單推送

查詢昨日銷售總額

MySQL節點

查詢測試資料中的昨日銷售總額,將查詢結果以上下文參數的形式輸出給資料推送節點。

推送昨日銷售總額

資料推送

接收MySQL節點傳遞的輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

MaxCompute資料推送

MySQL同步資料至ODPS

離線同步

將MySQL內準備的資料,通過Data Integration的方式同步至ODPS資料來源之中。

ODPS資料查詢

賦值節點

對ODPS資料來源中的資料進行查詢,再將查詢結果以上下文參數的形式輸出給資料推送節點。

ODPS資料推送

資料推送

接收賦值節點ODPS資料查詢的輸出參數,將上下文參數中的輸入參數攜帶的資料推送至推送目標。

準備資料

在日常業務中存在多種多樣的資料情境,本實踐以簡單的訂單表為案例示範資料推送節點的使用,以下為建立並寫入測試資料的步驟,若您無需測試資料,也可跳過該步驟。

建立測試表

  1. 登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與營運 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 在左側導覽列,單擊image進入臨時查詢頁面後,滑鼠懸浮在image按鈕,建立 > MySQL新增臨時查詢。

    • 節點類型:MySQL。

    • 路徑:臨時查詢。

    • 名稱:建立測試資料表。

  3. 建立測試表orders,表格SQL如下:

CREATE TABLE orders (
     order_id INT NOT NULL AUTO_INCREMENT,
     category VARCHAR(100) NOT NULL, -- 類別
     sales DOUBLE NOT NULL, -- 訂單銷售額
     datetime DATETIME NOT NULL, -- 訂單發生時間
     PRIMARY KEY (order_id),
     INDEX (category)
);

建立預存程序

以下預存程序將隨機產生過去兩個月的訂單表資料。

說明

預存程序需要在MySQL用戶端中執行建立。

DELIMITER $$

CREATE PROCEDURE InsertOrders(IN num_orders INT)
BEGIN
  DECLARE v_category VARCHAR(100);
  DECLARE v_sales DOUBLE;
  DECLARE v_datetime DATETIME;
  DECLARE v_category_list VARCHAR(255);
  DECLARE v_index INT;
  DECLARE i INT DEFAULT 0;
  
  -- 定義類別列表的字串,以逗號分隔
  SET v_category_list = 'Electronics,Books,Home & Kitchen,Fashion,Toys,Baby,Computers,Electronics,Games,Garden,Clothing,Grocery,Health,Jewelry,Kids';
  -- 獲得類別的總數
  SET v_index = ROUND((RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
  
  WHILE i < num_orders DO
    -- 產生隨機索引以選擇類別
    SET v_index = FLOOR(1 + (RAND() * (CHAR_LENGTH(v_category_list) - CHAR_LENGTH(REPLACE(v_category_list, ',', '')) + 1)));
    -- 從類別列表中提取隨機類別
    SET v_category = SUBSTRING_INDEX(SUBSTRING_INDEX(v_category_list, ',', v_index), ',', -1);
    
    -- 產生1000至30000之間的隨機銷售額
    SET v_sales = 1000 + FLOOR(RAND() * 29000);
    
    -- 在過去兩個月內產生隨機日期時間
    SET v_datetime = NOW() - INTERVAL FLOOR(RAND() * 61) DAY;
    
    -- 將新的隨機訂單插入到訂單表中
    INSERT INTO orders (category, sales, datetime) VALUES (v_category, v_sales, v_datetime);
    
    SET i = i + 1;
  END WHILE;
END$$

DELIMITER ;

寫入測試資料

預存程序建立成功後,可通過CALL語句調用預存程序,向orders表記憶體儲隨機產生的資料。

-- 調用預存程序以插入特定數量的隨機訂單
CALL InsertOrders(1000); -- 這將插入1000條隨機訂單

配置推送流程

在DataWorks推送流程根據不同的推送邏輯可以進行條件推送指令碼推送合并推送簡單推送以及MaxCompute資料推送,本實踐以MySQL為例,示範五種其他資料來源推送方式以及MaxCompute資料來源資料推送樣本。

條件推送

步驟一:搭建推送流程

雙擊開啟資料推送Demo流程商務程序圖頁面,依次將查詢上月銷售總額通過達標條件判斷分成達標上月銷售達標資料推送銷售達標前三名類別和不達標上月銷售不達標資料推送銷售不達標倒數三名類別,關聯起來,產生一條如下圖所示的條件推送的工作流程。

image

步驟二:配置SQL查詢節點

通過配置SQL查詢節點對測試資料進行查詢,通過節點上下文參數產生SQL查詢節點的輸出參數outputs,從而實現將SQL的查詢結果傳遞至分支節點。

  1. 雙擊開啟查詢上月銷售總額節點,編寫以下查詢SQL代碼。

    -- 統計上個月銷售額
    SELECT SUM(sales) AS sales_amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴上遊節點:勾選使用工作空間根節點

    • 本節點輸出參數:在節點上下文參數下單擊本節點輸出參數後的添加賦值參數,添加輸出參數作為下遊節點輸入參數的取值。image

步驟三:配置分支節點

在分支節點內,通過節點上下文參數本節點輸入參數擷取上遊SQL查詢節點的輸出參數outputs,然後在分支節點進行條件判斷後通過本節點輸出參數輸出合格資料。

  1. 雙擊開啟達標條件判斷節點,單擊添加分支,新增兩個分支,分別為達標和不達標,詳細配置資訊如下

    配置項

    達標分支

    不達標分支

    分支條件

    ${inputs[0][0]}>=500000

    ${inputs[0][0]}<500000

    關聯到節點輸出

    達標。

    不達標。

    分支描述

    上月銷售總額達標。

    上月銷售總額不達標。

    說明

    [0][0]為二維數組,用來定位擷取到的參數中用於條件判斷的資料。

    • 上遊為SQL查詢節點情況下,使用二維數組,定位參數中需要作為條件的資料。

    • 上遊為Python節點情況下,使用一維數組,定位參數中需要作為條件的資料。

  2. 雙擊開啟達標條件判斷節點,單擊右側調度配置,在彈出的調度配置中配置如下內容。

    參數類型

    參數內容與說明

    時間屬性

    調度周期

    定時調度時間

    08:00

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇已建立好的調度資源群組。

    說明

    初次使用資料推送節點,需先提工單升級調度資源群組。

    本節點輸出名稱

    當配置分支後,輸出名會自動解析並添加到此處。輸出名與分支定義的關聯到節點輸出一致。

    節點上下文參數

    本節點輸入參數

    參數名inputs

    取值來源:選擇上遊節點查詢上月銷售總額的輸出參數outputs

    本節點輸出參數

    系統預設添加為outputs

  3. 完成配置後,單擊image儲存達標條件判斷節點。

步驟四:配置分支SQL查詢節點

分支SQL查詢節點在本實踐中分為上月銷售達標資料上月銷售不達標資料兩個節點,需要將分支節點上下文參數輸出至上月銷售達標資料節點和上月銷售不達標資料節點,然後將分支節點SQL查詢結果用節點上下文參數本節點輸出參數分別輸出至對應的達標不達標資料推送節點。

  1. 分別雙擊上月銷售達標資料節點和上月銷售不達標資料節點,進入編輯頁面,編寫以下SQL代碼。

    上月銷售達標資料

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    -- 建立暫存資料表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    --查詢並寫入暫存資料表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount DESC limit 3;
    --查詢達標前三名資料
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;

    上月銷售不達標資料

    SET @all_cat_sales_volume_month := 0.0;
    SELECT SUM(sales) INTO @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59');
    
    --建立暫存資料表
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      category VARCHAR(255),
      sales DOUBLE,
      all_cat_sales_volume_month DOUBLE
    );
    
    --查詢並寫入暫存資料表
    INSERT INTO temp_array (category, sales, all_cat_sales_volume_month) SELECT category, SUM(sales) AS amount, @all_cat_sales_volume_month FROM orders WHERE datetime BETWEEN DATE_FORMAT(CURRENT_DATE - INTERVAL 1 MONTH, '%Y-%m-01 00:00:00') AND DATE_FORMAT(LAST_DAY(CURRENT_DATE - INTERVAL 1 MONTH), '%Y-%m-%d 23:59:59') GROUP BY category ORDER BY amount ASC limit 3;
    
    --查詢不達標的後三名資料
    SELECT category, sales, all_cat_sales_volume_month FROM temp_array;
  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴的上遊節點:添加上遊的分支節點,因在步驟一:搭建推送流程已建立好流程,需查看上遊依賴是否正確。

      • 上月銷售達標資料節點:上遊節點輸出名為達標

      • 上月銷售不達標資料節點:上遊節點輸出名為不達標

    • 本節點輸出參數:在節點上下文參數下單擊本節點輸出參數後的添加賦值參數,添加輸出參數作為下遊節點輸入參數的取值。image

  3. 單擊image儲存上月銷售達標資料節點和上月銷售不達標資料節點。

步驟五:配置分支資料推送節點

需要建立兩個資料推送節點,分別配置節點上下文參數本節點輸入參數,來擷取上月銷售達標資料上月銷售不達標資料的分支SQL查詢節點輸出的outputs參數,在本文中使用這些參數並將其推送至各自的目標。

  1. 雙擊推送銷售達標前三名類別節點和推送銷售不達標倒數三名類別節點,進入節點後,單擊調度配置,在調度配置面板中進行如下配置。

    參數類型

    參數內容

    圖示

    調度參數

    參數名

    curdate

    image

    參數值

    $[yyyymmddhh:mi:ss]

    時間屬性

    調度周期

    image

    定時調度時間

    08:00

    說明

    本實踐以8:00為例,確保資料能在08:00時推送至目標渠道。您可以根據實際需要配置其他時間點。

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇已建立好的調度資源群組。

    說明

    初次使用資料推送節點,需先提工單升級調度資源群組。

    image

    節點上下文參數

    本節點輸入參數

    單擊添加新增本節點輸入參數:

    • 參數名:inputs

    • 取值來源:

      • 推送銷售達標前三名類別節點選擇上月銷售達標資料節點的輸出參數outputs

      • 推送銷售不達標倒數三名類別節點選擇上月銷售不達標資料節點的輸出參數outputs

    • 推送銷售達標前三名類別節點image

    • 推送銷售不達標倒數三名類別節點image

  2. 調度配置完成後,即可配置資料推送內容,詳情如下。

    • 資料推送目標:下拉資料推送目標選擇所需的資料推送目標,若不存在,可單擊下拉框右下角的建立資料推送目標,建立推送目標。

    • 參數

      說明

      類型

      支援圈選DingTalk、飛書、企業微信、Teams以及郵件。

      對象名稱

      可按業務需求進行自訂。

      WebHook

      DingTalk、飛書、企業微信機器人或Teams的Webhook,以及郵件的SMTP需要在相應的目標平台上擷取。

      說明
    • 標題推送銷售達標前三名類別推送銷售不達標倒數三名類別

    • 本文:按需求進行配置,詳情可參見配置推送內容

      說明

      本文中可直接使用上遊SQL查詢節點查詢的欄位名作為預留位置,以擷取上遊輸入的參數。

      • 推送銷售達標前三名類別樣本image

      • 推送銷售不達標倒數三名類別樣本image

  3. 配置完成後,單擊image儲存推送銷售達標前三名類別節點和推送銷售不達標倒數三名類別節點。

步驟六:測試條件推送流程

在完成條件推送流程配置後,需要對條件測試流程進行測試,以便後續提交發布。

  1. 雙擊開啟資料推送Demo流程商務程序頁面。

  2. 選中查詢上月銷售總額節點,右鍵選擇運行節點及下遊,等待運行完成即可。

    說明

    如果出現任務運行失敗,選中需要查看的節點,單擊右鍵選擇查看日誌即可查看日誌。

    image

指令碼推送

步驟一:搭建推送流程

雙擊開啟資料推送Demo流程商務程序圖頁面,在商務程序圖頁面,可以看到已建立的多個節點,依次將查詢上一周銷售資料組織銷售前三名列表推送上周前三名類別進行關聯,產生一條如下圖所示的指令碼推送的工作流程。

image

步驟二:配置SQL查詢節點

通過配置SQL查詢節點對測試資料進行查詢,可以通過節點上下文參數產生SQL查詢節點的輸出參數outputs,從而實現將SQL的查詢結果傳遞至賦值節點。

  1. 雙擊查詢上一周銷售資料節點,進入編輯頁面,編寫以下SQL代碼。

    -- 統計上一周銷售額
    SELECT category, SUM(sales) AS amount
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 WEEK), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category ORDER BY amount DESC limit 3;
  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴上遊節點:勾選使用工作空間根節點

    • 本節點輸出參數:在節點上下文參數下單擊本節點輸出參數後的添加賦值參數,添加輸出參數作為下遊節點輸入參數的取值。image

  3. 單擊image儲存查詢上一周銷售資料節點。

步驟三:配置賦值節點

賦值節點通過節點上下文參數下的本節點輸入參數可以擷取到上遊SQL節點的輸出參數outputs,將SQL節點的輸出參數進行重新賦值調整,產生新的本節點輸出參數,將新資料輸出至資料推送節點。

  1. 雙擊組織銷售前三名列表的賦值節點,進入節點編輯頁面,編寫以下Python代碼。

    def main():
    
        From datetime import date
        today = date.today()
        formatted_date = today.strftime('%Y-%m-%d')
        
        msg = 'Stat date: ' + formatted_date + ' \\n\\n ' \
        '- 1: ${inputs[0][0]}, sales: ${inputs[0][1]} \\n\\n ' \
        '- 2: ${inputs[1][0]}, sales: ${inputs[1][1]} \\n\\n ' \
        '- 3: ${inputs[2][0]}, sales: ${inputs[2][1]} \\n\\n '
        
        print(msg)
    
    
    if __name__ == "__main__":
        import sys
        main()
  2. 編輯完成賦值代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的調度資源群組。

    • 節點上下文參數

      • 本節點輸入參數

        • 參數名inputs

        • 取值來源:上遊查詢上一周銷售資料節點的輸出參數outputs。

      • 本節點輸出參數:由系統預設產生。

  3. 單擊image儲存組織銷售前三名列表賦值節點。

步驟四:配置資料推送節點

通過配置節點上下文參數本節點輸入參數,來擷取上遊賦值節點輸出的outputs參數,在本文中使用這些參數並將其推送至目標。

  1. 雙擊名為推送上周前三名類別的資料推送節點,進入節點後,單擊調度配置,在調度配置面板中進行如下配置。

    參數類型

    參數內容

    圖示

    調度參數

    參數名

    curdate

    image

    參數值

    $[yyyymmddhh:mi:ss]

    時間屬性

    調度周期

    image

    定時調度時間

    08:00

    說明

    本實踐以8:00為例,確保資料能在08:00時推送至目標渠道。您可以根據實際需要配置其他時間點。

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇已建立好的調度資源群組。

    說明

    初次使用資料推送節點,需先提工單升級調度資源群組。

    image

    節點上下文參數

    本節點輸入參數

    單擊添加新增本節點輸入參數:

    參數名:inputs

    取值來源:選擇上遊節點上周銷售前三類的輸出參數outputs

    image

  2. 調度配置完成後,即可配置資料推送內容,詳情如下。

    • 資料推送目標:下拉資料推送目標選擇所需的資料推送目標,若不存在,可單擊下拉框右下角的建立資料推送目標,建立推送目標。

    • 參數

      說明

      類型

      支援圈選DingTalk、飛書、企業微信、Teams以及郵件。

      對象名稱

      可按業務需求進行自訂。

      WebHook

      DingTalk、飛書、企業微信機器人或Teams的Webhook,以及郵件的SMTP需要在相應的目標平台上擷取。

      說明
    • 標題推送上周前三名類別

    • 本文:按需求進行配置本文即可,詳情可參見配置推送內容

      說明

      本文中可直接使用上遊SQL查詢節點查詢的欄位名作為預留位置,以擷取上遊輸入的參數。

      image

  3. 配置完成後,單擊image儲存推送上周前三名類別推送節點。

步驟五:測試指令碼推送流程

在完成指令碼推送流程配置後,需要對指令碼測試流程進行測試,以便後續提交發布。

  1. 雙擊資料推送Demo流程開啟商務程序圖頁面。

  2. 選中查詢上一周銷售資料節點,右鍵選擇運行節點及下遊,等待運行完成即可。

    說明

    如果出現任務運行失敗,選中需要查看的節點,單擊右鍵選擇查看日誌即可查看日誌。

    image

簡單推送

步驟一:搭建推送流程

雙擊資料推送Demo流程後,在商務程序圖頁面,可以看到已建立的多個節點,需拖動MySQL查詢節點查詢昨日銷售總額作為上遊節點,並將其與下遊的資料推送節點推送昨日銷售總額關聯起來,產生一條如下圖所示的簡單推送的工作流程。

image

步驟二:配置SQL查詢節點

通過配置SQL查詢節點對測試資料進行查詢,可以通過節點上下文參數產生SQL查詢節點的輸出參數outputs,從而實現將SQL的查詢結果傳遞至資料推送節點。

  1. 雙擊查詢昨日銷售總額節點,編寫以下查詢SQL代碼。

    -- 建立暫存資料表temp_array
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array (
      total_amount DOUBLE
    );
    
    -- 將查詢到的昨日銷售總額寫入暫存資料表temp_array中
    INSERT INTO temp_array (total_amount) 
    SELECT SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59');
    
    -- 查詢暫存資料表temp_array
    select total_amount FROM temp_array;
  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴上遊節點:勾選使用工作空間根節點

    • 本節點輸出參數:在節點上下文參數下單擊本節點輸出參數後的添加賦值參數,添加輸出參數作為下遊節點輸入參數的取值。image

  3. 單擊image儲存查詢昨日銷售總額節點。

步驟三:配置資料推送節點

通過配置節點上下文參數本節點輸入參數,來擷取上遊SQL查詢節點輸出的outputs參數,在本文中使用這些參數並將其推送至目標。

  1. 雙擊名為推送昨日銷售總額的資料推送節點,進入節點後,單擊調度配置,在調度配置面板中進行如下配置。

    參數類型

    參數內容

    圖示

    調度參數

    參數名

    curdate

    image

    參數值

    $[yyyymmddhh:mi:ss]

    時間屬性

    調度周期

    image

    定時調度時間

    08:00

    說明

    本實踐以8:00為例,確保資料能在08:00時推送至目標渠道。您可以根據實際需要配置其他時間點。

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇已建立好的調度資源群組。

    說明

    初次使用資料推送節點,需先提工單升級調度資源群組。

    image

    節點上下文參數

    本節點輸入參數

    單擊添加新增本節點輸入參數:

    參數名 : inputs

    取值來源:選擇上遊節點查詢昨日銷售總額的輸出參數outputs

    image

  2. 調度配置完成後,即可配置資料推送內容,詳情如下。

    • 資料推送目標:下拉資料推送目標選擇所需的資料推送目標,若不存在,可單擊下拉框右下角的建立資料推送目標,建立推送目標。

      參數

      說明

      類型

      支援圈選DingTalk、飛書、企業微信、Teams以及郵件。

      對象名稱

      可按業務需求進行自訂。

      WebHook

      DingTalk、飛書、企業微信機器人或Teams的Webhook,以及郵件的SMTP需要在相應的目標平台上擷取。

      說明
    • 標題推送昨日銷售總額

    • 本文:按需求進行配置本文即可,詳情可參見配置推送內容

      說明

      本文中可直接使用上遊SQL查詢節點查詢的欄位名作為預留位置,以擷取上遊輸入的參數。

      image

  3. 配置完成後,單擊image儲存推送昨日銷售總額推送節點。

步驟四:測試簡單推送流程

在完成簡單推送流程配置後,需要對簡單測試流程進行測試,以便後續提交發布。

  1. 雙擊開啟資料推送Demo流程商務程序圖頁面。

  2. 選中查詢昨日銷售總額節點,右鍵選擇運行節點及下遊,等待運行完成即可。

    說明

    如果出現任務運行失敗,選中需要查看的節點,單擊右鍵選擇查看日誌即可查看日誌。

    image

合并推送

步驟一:搭建推送流程

雙擊開啟資料推送Demo流程商務程序圖頁面,在商務程序圖頁面,可以看到已建立的多個節點,拖拽上遊節點查詢昨日銷售總額查詢昨日銷售增長額與下遊節點推送昨日銷售總額與增長額關聯起來,產生一條如下圖所示的合并推送的工作流程。

說明

查詢昨日銷售總額節點可與簡單推送中的昨日銷售總額共用同一節點

image

步驟二:配置SQL查詢節點

通過配置SQL查詢節點對測試資料進行查詢,可以通過節點上下文參數產生SQL查詢節點的輸出參數outputs,將SQL的查詢結果傳遞至資料推送節點。

  1. 單擊查詢昨日銷售增長額節點,編寫以下查詢SQL代碼。

    -- 建立統計前日資料表temp_array1
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array1 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 寫入前日資料至temp_array1
    INSERT INTO temp_array1 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 2 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 建立統計昨日資料表temp_array2
    CREATE TEMPORARY TABLE IF NOT EXISTS temp_array2 (
      category VARCHAR(255),
      sales DOUBLE
    );
    -- 寫入昨日資料至temp_array2
    INSERT INTO temp_array2 (category, sales) SELECT category, SUM(sales)
    FROM orders
    WHERE datetime BETWEEN DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 00:00:00') AND DATE_FORMAT(DATE_SUB(CURDATE(), INTERVAL 1 DAY), '%Y-%m-%d 23:59:59')
    GROUP BY category;
    
    -- 建立統計昨日銷售增長額result
    CREATE TEMPORARY TABLE IF NOT EXISTS result (
      category VARCHAR(255),
      diff DOUBLE
    );
    -- 寫入增長額資料至result
    INSERT INTO result (category, diff) SELECT temp_array2.category AS category, temp_array2.sales - temp_array1.sales AS diff FROM temp_array1 LEFT JOIN temp_array2 ON temp_array1.category = temp_array2.category;
    
    -- 查詢增長額暫存資料表result
    SELECT category, diff FROM result;

  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴上遊節點:勾選使用工作空間根節點

    • 本節點輸出參數:在節點上下文參數下單擊本節點輸出參數後的添加賦值參數,添加輸出參數作為下遊節點輸入參數的取值。image

  3. 單擊image儲存查詢昨日銷售增長額節點。

步驟三:配置資料推送節點

通過節點上下文參數本節點輸入參數接收上遊查詢昨日銷售總額查詢昨日銷售增長額的輸出參數outputs,從而可以在本文中使用參數並將其推送至目標。

  1. 雙擊名為推送昨日銷售總額與增長額的資料推送節點,進入節點後,單擊調度配置,在調度配置面板中進行如下配置。

    參數類型

    參數內容

    圖示

    調度參數

    參數名

    curdate

    image

    參數值

    $[yyyymmddhh:mi:ss]

    時間屬性

    調度周期

    image

    定時調度時間

    08:00

    說明

    本實踐以8:00為例,確保資料能在08:00時推送至目標渠道。您可以根據實際需要配置其他時間點。

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇已建立好的調度資源群組。

    說明

    初次使用資料推送節點,需先提工單升級調度資源群組。

    image

    節點上下文參數

    本節點輸入參數

    單擊添加新增本節點輸入參數:

    參數一

    • 參數名inputs1

    • 取值來源:選擇上遊節點查詢昨日銷售總額的輸出參數outputs

    參數二

    • 參數名inputs2

    • 取值來源:選擇上遊節點查詢昨日銷售增長額的輸出參數outputs

    image

  2. 調度配置完成後,即可配置資料推送內容,詳情如下。

    • 資料推送目標:下拉資料推送目標選擇所需的資料推送目標,若不存在,可單擊下拉框右下角的建立資料推送目標,建立推送目標。

    • 參數

      說明

      類型

      支援圈選DingTalk、飛書、企業微信、Teams以及郵件。

      對象名稱

      可按業務需求進行自訂。

      WebHook

      DingTalk、飛書、企業微信機器人或Teams的Webhook,以及郵件的SMTP需要在相應的目標平台上擷取。

      說明
    • 標題推送昨日銷售總額與增長額

    • 本文:按需求進行配置本文即可,詳情可參見配置推送內容

      說明

      本文中可直接使用上遊SQL查詢節點查詢的欄位名作為預留位置,以擷取上遊輸入的參數。

  3. 配置完成後,單擊image儲存推送昨日銷售總額與增長額推送節點。

步驟四:測試合并推送流程

在完成簡單推送流程配置後,才可對合并推送測試流程進行測試,以方便後續提交發布。

  1. 雙擊開啟資料推送Demo流程商務程序圖頁面。

  2. 選中推送昨日銷售總額與增長額節點,右鍵選擇運行到該節點,等待運行完成即可。

    說明

    如果出現任務運行失敗,選中需要查看的節點,單擊右鍵選擇查看日誌即可查看日誌。

    image

MaxCompute資料推送

步驟一:搭建推送流程

雙擊資料推送Demo流程後,在商務程序圖頁面,可以看到已建立的多個節點,需拖動離線同步節點作為賦值節點的上遊。將MySQL同步資料至ODPSODPS資料查詢以及ODPS資料推送三個節點依次串連起來。

步驟二:配置離線同步節點

通過配置MySQL同步資料至ODPS節點,實現將準備資料階段在MySQL中寫入的測試資料同步至ODPS資料來源,以供後續使用。

  1. 雙擊MySQL同步資料至ODPS節點進入離線同步節點的配置。

    配置項

    配置內容

    圖示

    資料來源側

    資料來源

    MySQL

    image

    資料來源名稱

    選擇您建立的MySQL資料來源。

    我的資源群組

    選擇Serverless 資源群組。

    資料去向側

    資料去向

    MaxCompute(ODPS)

    資料來源名稱

    選擇該空間綁定的MaxCompute資料來源。

    配置完成後,系統會自動進行資料來源與資源群組之間的連通性測試,待連通性展示為成功後即可單擊下一步,進行資料來源與去向的詳細配置。

  2. 配置資料來源與去向。

    配置項

    配置內容

    圖示

    資料來源

    資料來源

    保持預設:

    • MySQL。

    • 選擇您建立的MySQL資料來源。

    image

    選擇orders表。

    資料過濾

    可按需求進行配置,本實踐中置空即可。

    切分鍵

    您可以將來源資料表中某一列作為切分鍵,建議使用主鍵或有索引的列作為切分鍵。

    資料預覽

    預覽從MySQL資料來源中擷取到的資料,可用於判斷是否符合預期。

    資料去向

    資料來源

    保持預設:

    • MaxCompute(ODPS)。

    • 選擇該空間綁定的MaxCompute資料來源。

    image

    Tunnel資源群組

    即 Tunnel Quota,預設選擇“公用傳輸資源”,即MC的免費quota。

    單擊一鍵產生目標表結構,產生目標表。

    分區資訊

    如果您每日增量資料限定在對應日期的分區中,可以使用分區做每日增量,比如配置分區pt值為${bizdate}

    寫入模式

    入前清理已有資料(Insert Overwrite

  3. 配置好來源與去向後,將欄位進行同名映射即可。image

  4. 通道配置。

    • 任務期望最大並發數:由於資源原因或者任務本身特性等原因,實際執行時並發數可能小於等於此值,收費按照實際執行的並發數收費,本實踐選擇2個並發即可。

    • 同步速率:通過限流可以保護資料來源端或者資料去向端的讀寫壓力,不限流的情況下則會提供現有硬體環境下最大的傳輸效能,本實踐不進行限流。

    • 髒資料策略:不容忍髒資料。

    • 分散式處理能力:預設為關閉狀態,需要並發數大於等於8才能開啟分布式。

    image

  5. 調度配置。

    完成整合任務配置後,單擊右側調度配置,在彈出的調度面板中配置如下內容。

    • 調度參數

      • 參數名bizdate

      • 參數值$[yyyymmdd-1]

    • 定時調度時間08:00

    • 重跑屬性:下拉選擇運行成功或失敗後皆可重跑。

    • 調度資源群組:選擇已建立好的調度資源群組。

    • 調度依賴:勾選使用工作空間根節點,作為離線同步任務的上遊節點。

  6. 單擊image儲存MySQL同步資料至ODPS離線同步節點。

步驟三:配置賦值節點

ODPS資料來源不支援使用ODPS SQL節點進行查詢並通過節點上下文參數輸出資料至資料推送節點。需採用賦值節點對ODPS資料進行查詢處理,再通過節點上下文參數進行輸出至資料推送節點。

  1. 雙擊ODPS資料查詢節點,進入賦值節點。

    1. 在賦值節點編輯頁面上方的請選擇賦值語言的下拉框裡選擇ODPS SQL。

    2. 寫入ODPS SQL。

      -- 首先,我們建立一個帶有排名的子查詢,用於計算每個分區(pt)內按銷售額(sales)降序排列的順序。
      -- DENSE_RANK() 函數會為每個分區內的行提供一個唯一的排名,相等的銷售額會得到相同的排名,但不會跳過任何一個排名數字。
      --
      -- 子查詢部分:
      -- 1. 從'orders'表中選擇需要的列:order_id(訂單ID)、category(類別)、sales(銷售額)、datetime(時間)和pt(業務日期)。
      -- 2. 使用DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC):
         -- PARTITION BY pt 指定按照pt欄位進行分區,即對每個不同的pt值分別進行排名。
         -- ORDER BY sales DESC 表示在每個分區內按照sales欄位的降序來排序,即銷售額最高的排在前面。
         -- rank 列將會儲存每行在各自分區內的銷售排名。
      --
      -- 主查詢部分:
      -- 從上述帶有排名的子查詢中篩選出排名在前三名(rank <= 3)的記錄。
      -- 這意味著對於每個業務日期,我們只選取銷售額最高的前三個訂單的資訊。
      
      SELECT
        order_id,          -- 選擇訂單ID
        category,          -- 選擇商品類別
        sales,             -- 選擇銷售額
        datetime,          -- 選擇訂單時間
        pt                  -- 選擇業務日期
      FROM (
          SELECT
            order_id,
            category,
            sales,
            datetime,
            pt,
            DENSE_RANK() OVER (PARTITION BY pt ORDER BY sales DESC) AS rank  -- 計算每個pt分區內的銷售額排名
          FROM orders
          WHERE pt = '${bizdate}'  -- 篩選業務日期為${bizdate}的記錄
      ) AS ranked_orders
      WHERE rank <= 3  -- 只保留每個pt分區銷售額排名前三的記錄
  2. 編輯完成SQL代碼後,單擊右側調度配置,在彈出的調度配置面板中配置如下內容。

    • 定時調度時間08:00

    • 調度資源群組:選擇已建立好的Serverless資源群組。

    • 依賴上遊節點:檢查依賴的上遊是否為MySQL同步資料至ODPS的離線同步節點。

    • 節點上下文參數:單擊本節點輸出參數後的添加,添加輸出參數作為下遊節點輸入參數的取值。

      image

  3. 單擊image儲存ODPS資料查詢節點。

步驟四:配置資料推送節點

通過配置節點上下文參數本節點輸入參數,來擷取上遊賦值節點輸出的outputs參數,在本文中使用這些參數並將其推送至目標。

  1. 雙擊名為ODPS資料推送的資料推送節點,進入節點後,單擊調度配置,在調度配置面板中進行如下配置。

    參數類型

    參數內容

    圖示

    調度參數

    參數名

    curdate

    image

    參數值

    $[yyyymmddhh:mi:ss]

    時間屬性

    調度周期

    image

    定時調度時間

    08:00

    說明

    本實踐以8:00為例,確保資料能在08:00時推送至目標渠道。您可以根據實際需要配置其他時間點。

    重跑屬性

    運行成功或失敗後皆可重跑。

    資源屬性

    調度資源群組

    選擇資料推送節點功能上線日期後建立的資源群組,若資源群組為發布日期前,需提工單升級調度資源群組。

    說明

    資料推送節點發布日期為2024年6月28日,DataWorks更多發布記錄可參見:功能發布記錄

    image

    節點上下文參數

    本節點輸入參數

    單擊添加新增本節點輸入參數:

    • 參數名inputs

    • 取值來源:選擇上遊節點ODPS資料查詢的輸出參數outputs。

    image

  2. 調度配置完成後,即可配置資料推送內容,詳情如下。

    • 資料推送目標:下拉資料推送目標選擇所需的資料推送目標,若不存在,可單擊下拉框右下角的建立資料推送目標,建立推送目標。

    • 參數

      說明

      類型

      支援圈選DingTalk、飛書、企業微信、Teams以及郵件。

      對象名稱

      可按業務需求進行自訂。

      WebHook

      DingTalk、飛書、企業微信機器人或Teams的Webhook,以及郵件的SMTP需要在相應的目標平台上擷取。

      說明
    • 標題:ODPS資料推送

    • 本文:按需求進行配置即可,詳情可參見配置推送內容

      說明

      本文中可直接使用上遊賦值節點查詢的欄位名作為預留位置,以擷取上遊輸入的參數。

  3. 配置完成後,單擊image儲存ODPS資料推送節點。

步驟五:測試MaxCompute資料推送流程

在完成資料推送流程節點配置後,需要對資料推送流程進行測試,以便後續提交發布。

  1. 雙擊開啟資料推送Demo流程商務程序頁面。

  2. 選中MySQL同步資料至ODPS節點,右鍵選擇運行節點及下遊,等待運行完成即可。

    說明

    如果出現任務運行失敗,選中需要查看的節點,單擊右鍵選擇查看日誌即可查看日誌。

    image

提交與發布

在完成資料推送流程配置後,雙擊資料推送Demo樣本商務程序圖頁面,測試所有的資料推送流程是否能正常運行,測試成功後,需要提交發布。

  1. 在資料推送流程的編輯頁面,單擊image,運行商務程序。

  2. 待資料推送流程中的所有節點後出現image,單擊image提交運行成功的資料推送流程。

  3. 選擇提交對話方塊中需要提交的節點,勾選忽略輸入輸出不一致的警示

  4. 單擊提交

  5. 提交成功後,即可在發佈頁面發布流程節點,詳情可參見發布任務

後續步驟

資料推送流程將會隨著配置的調度周期而運行,我們在營運頁面可以對發行的資料推送的各個任務節點進行各種的營運操作,詳情請參見周期任務基本營運操作