全部產品
Search
文件中心

DataWorks:開發部署擴充程式:Function Compute方式

更新時間:Feb 08, 2025

在DataWorks擴充程式中,您可以自訂邏輯以監管使用者的操作行為,例如攔截和阻斷不當行為,通過擴充程式對特定事件進行訊息通知與流程管控。本文為您介紹如何通過Function Compute方式開發部署擴充程式。

背景資訊

Function Compute是事件驅動的全託管計算服務,您可以將擴充程式部署在Function Compute執行環境中,DataWorks 會將擴充點事件訊息推送至 ExtensionRequest 類。在您的擴充程式碼中,通過實現 PojoRequestHandler 介面的 handleRequest 方法,您可以構造 ExtensionRequest 對象,以接收 DataWorks 推送的擴充點事件訊息及上下文(Context),您可以定義擴充程式的處理邏輯,並通過 ExtensionResponse 類返回擴充程式處理結果。DataWorks 將自動從 ExtensionResponse 中擷取處理結果來決定本次擴充點操作流程是否阻塞。

使用限制

  • 僅支援DataWorks企業版。

  • 支援地區:華北2(北京)、華東1(杭州)、華東2(上海)、華北3(張家口)、華南1(深圳)、西南1(成都)、美國(矽谷)、美國(維吉尼亞)、德國(法蘭克福)、日本(東京)、中國(香港)、新加坡

注意事項

  • 許可權控制:僅開放平台管理員租用戶系統管理員、阿里雲主帳號或者擁有AliyunDataWorksFullAccess許可權的RAM使用者擁有開發人員背景讀寫權限,許可權控制詳情請參見全域級模組許可權控制產品及控制台許可權控制詳情:RAM Policy

  • 版本限制:如果您使用的企業版DataWorks到期,所有擴充程式將會失效,無法再觸發事件檢查。已觸發且未達終態的檢查會自動通過。

  • 節點限制:包含內部節點的組合類別節點(例如:機器學習(PAI)節點do-while節點for-each節點)觸發檢查時,需內部節點均檢查通過才可繼續進行後續操作。

  • 觸發說明:多個擴充程式可關聯同一個擴充點事件,即同一個事件支援觸發多個擴充程式。

  • 事件節流:通過Function Compute方式部署的擴充程式目前僅支援事件為:資料下載前置事件資產上架\下架前置事件資料上傳前置事件

處理流程

以下為通過Function Compute開發部署的擴充程式處理擴充點事件啟動並執行基本流程。

image
說明

擴充點事件觸發後,在等待擴充程式回調API訊息結果時,觸發擴充事件的流程會變成檢查中狀態,直至擴充程式將處理結果返回給DataWorks,DataWorks會根據反饋結果狀態決定是否阻塞本次流程。

使用者側

在進行Function Compute部署擴充程式前,需先完成擴充程式的開發,使用fc-java-core庫運行請求處理常式,範例程式碼為fc_dataworks_demo01-1.0-SNAPSHOT.jar,操作詳情請參見事件請求處理常式(Event Handler)

步驟一:配置擴充程式依賴

在開發擴充程式時,需要添加以下依賴進pom.xml檔案。

DataWorks依賴庫

<dependency>
 <groupId>com.aliyun</groupId>
 <artifactId>dataworks_public20200518</artifactId>
 <version>5.6.0</version>
</dependency>

Function Compute依賴庫

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-core -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-core</artifactId>
    <version>1.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.aliyun.fc.runtime/fc-java-event -->
<dependency>
    <groupId>com.aliyun.fc.runtime</groupId>
    <artifactId>fc-java-event</artifactId>
    <version>1.2.0</version>
</dependency>

打包依賴庫

<build>
        <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-shade-plugin</artifactId>
                  <version>3.2.1</version>
                  <executions>
                    <execution>
                      <phase>package</phase>
                      <goals>
                        <goal>shade</goal>
                      </goals>
                      <configuration>
                        <filters>
                          <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                              <exclude>META-INF/*.SF</exclude>
                              <exclude>META-INF/*.DSA</exclude>
                              <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                          </filter>
                        </filters>
                      </configuration>
                    </execution>
                  </executions>
              </plugin>
        </plugins>
</build>

您可以使用Apache Maven Shade外掛程式或Apache Maven Assembly外掛程式,以上樣本僅以Apache Maven Shade外掛程式為例。

步驟二:開發擴充程式碼

在Function Compute上開發擴充程式,需要使用介面PojoRequestHandler,來實現方法handleRequest接收ExtensionRequest 類的請求和Context上下文並定義擴充程式接收處理邏輯,最後通過ExtensionResponse 類返回擴充程式處理結果。

  1. 開發程式碼注意事項。

    解析訊息內容

    DataWorks推送的事件訊息格式可參考開發參考:事件列表與訊息格式。在訊息格式中messageBody為具體的訊息內容,在實際開發時,可通過訊息格式中的messageBody.eventCode欄位確認訊息類型,並通過messageId欄位擷取訊息詳情。

    編寫處理邏輯

    請根據業務情境需要,針對該類型訊息進行邏輯處理。在擴充程式開發過程中,您可通過以下方式提高開發效率和應用效果。

    說明

    MessageId對應訊息中的id欄位,詳情可參考附錄:DataWorks發送給EventBridge的訊息格式

    返回處理結果至DataWorks

    在封裝返回結果ExtensionResponse時,需要指定訊息處理結果CheckResult。DataWorks會自動讀取最終的處理結果CheckResult,判斷流程是否失敗。

    • OK:擴充程式對本次擴充點事件檢查通過。

    • FAIL:擴充程式對本次擴充點事件檢查不通過。您需要查看並及時處理報錯,以免影響後續程式的正常執行。

    • WARN:擴充程式對本次擴充點事件檢查通過,但存在警告。

    您可下載此範例程式碼fc_dataworks_demo01-1.0-SNAPSHOT.jar上傳至Function Compute用於驗證測試,代碼詳情如下:

    代碼詳情

    APP

    該代碼實現了使用Function Compute介面PojoRequestHandler來實現方法 handleRequest 接收 ExtensionRequest 類型的請求和 Context上下文並定義擴充程式接收處理邏輯,最後通過 ExtensionResponse 類型返回擴充程式處理結果。

    該程式碼範例為禁止手動往ADS表上傳資料。

    package com.aliyun.example;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.dataworks.ExtensionRequest;
    import com.aliyun.dataworks.ExtensionResponse;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.PojoRequestHandler;
    
    
    public class App implements PojoRequestHandler<ExtensionRequest, ExtensionResponse> {
    
        public ExtensionResponse handleRequest(ExtensionRequest extensionRequest, Context context) {
            // 列印請求內容,用於調試
             System.out.println(JSON.toJSONString(extensionRequest));
            // 建立響應對象
            ExtensionResponse extensionResponse = new ExtensionResponse();
            // 判斷 eventType 是否為 'upload-data-to-table'
            if ("upload-data-to-table".equals(extensionRequest.getEventType())) {
                try {
                    // 將 messageBody 轉換為字串然後解析為 JSONObject
                    String messageBodyStr = JSON.toJSONString(extensionRequest.getMessageBody());
                    JSONObject messageBody = JSON.parseObject(messageBodyStr);
                    String tableGuid = messageBody.getString("tableGuid");
                    // 判斷 tableGuid 是否包含 'ads'
                    if (tableGuid != null && tableGuid.contains("ads")) {
                        extensionResponse.setCheckResult("FAIL");
                    } else {
                        extensionResponse.setCheckResult("OK");
                    }
                } catch (Exception e) {
                    extensionResponse.setCheckResult("FAIL");
                    extensionResponse.setErrorMessage("Error processing request: " + e.getMessage());
                    return extensionResponse;
                }
            } else {
                extensionResponse.setCheckResult("FAIL");
            }
    
            // 設定錯誤訊息,作為反饋資訊
            extensionResponse.setErrorMessage("This is a test!");
    
            // 返迴響應對象
            return extensionResponse;
        }
    }
    
    

    ExtensionRequest

    定義擴充程式請求結構,用於封裝DataWorks發送的事件訊息。

    public class ExtensionRequest {
        private Object messageBody;
        private String messageId;
        private String extensionBizId;
        private String extensionBizName;
        private String eventType;
        private String eventCategoryType;
        private Boolean blockBusiness;
    
        public ExtensionRequest() {
        }
    
        public Object getMessageBody() {
            return this.messageBody;
        }
    
        public void setMessageBody(Object messageBody) {
            this.messageBody = messageBody;
        }
    
        public String getMessageId() {
            return this.messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getExtensionBizId() {
            return this.extensionBizId;
        }
    
        public void setExtensionBizId(String extensionBizId) {
            this.extensionBizId = extensionBizId;
        }
    
        public String getExtensionBizName() {
            return this.extensionBizName;
        }
    
        public void setExtensionBizName(String extensionBizName) {
            this.extensionBizName = extensionBizName;
        }
    
        public String getEventType() {
            return this.eventType;
        }
    
        public void setEventType(String eventType) {
            this.eventType = eventType;
        }
    
        public String getEventCategoryType() {
            return this.eventCategoryType;
        }
    
        public void setEventCategoryType(String eventCategoryType) {
            this.eventCategoryType = eventCategoryType;
        }
    
        public Boolean getBlockBusiness() {
            return this.blockBusiness;
        }
    
        public void setBlockBusiness(Boolean blockBusiness) {
            this.blockBusiness = blockBusiness;
        }
    }

    ExtensionResponse

    定義擴充程式響應結構,用於封裝擴充程式的處理結果。

    public class ExtensionResponse {
        private String checkResult;
        private String errorMessage;
    
        public ExtensionResponse() {
        }
    
        public String getCheckResult() {
            return this.checkResult;
        }
    
        public void setCheckResult(String checkResult) {
            this.checkResult = checkResult;
        }
    
        public String getErrorMessage() {
            return this.errorMessage;
        }
    
        public void setErrorMessage(String errorMessage) {
            this.errorMessage = errorMessage;
        }
    }
  2. 代碼開發完成後,需將程式打包為可啟動並執行.jar包或.zip包,以供後續配置Function Compute使用。

    開啟代碼開發編輯器的命令列視窗,切換至根目錄,執行mvn clean package命令進行打包。

    • 如果顯示編譯失敗,請根據輸出的編譯錯誤資訊調整代碼。

    • 如果編譯成功,編譯後的JAR包位於專案檔夾內的target目錄,並根據pom.xml內的artifactIdversion欄位命名為java-example-1.0-SNAPSHOT.jar

    說明

    macOS與Linux作業系統下,打包前請確保代碼檔案具有可讀可執行許可權。

Function Compute側

步驟一:部署擴充程式

  1. 登入Function Compute控制台2.0,在左側導覽列單擊任務,切換至任務頁面。

  2. 單擊建立函數建立Function Compute服務及所需函數,進而部署擴充程式。開發的擴充程式後續會將特定事件訊息直接下發至該服務。操作請參見建立服務建立函數。詳情配置如下:image

    • 運行環境需根據您的代碼選擇,步驟一提供的樣本包(fc_dataworks_demo01-1.0-SNAPSHOT.jar)對應Java8環境。您需將fc_dataworks_demo01-1.0-SNAPSHOT.jar下載至本地並上傳至程式碼封裝中。實際使用時,請根據需要配置。

    • 更多函數的相關操作,請參見管理函數

  3. 修改函數入口

    在Function Compute控制台可配置請求處理常式(函數入口),對於Java語言的函數,您的請求處理常式需配置為[包名].[類名]::[方法名],例如:

    • 包名為example

    • 類型為HelloFC

    • 方法名為handleRequest

    請求處理常式可配置為example.HelloFC::handleRequest

    說明

    Function Compute預設程式入口為example.App::handleRequest,您需要根據您實際代碼情況修改函數入口,更多關於函數入口的說明,詳情請參見:請求處理常式

步驟二:測試函數

完成註冊後,可在函數詳情頁面切換至函數代碼頁簽單擊測試函數,對已建立的函數進行測試,當返回執行成功後,即可回到DataWorks側註冊擴充程式。

DataWorks側

步驟一:註冊擴充程式

擴充程式在Function ComputeFC上部署完成後,您需在DataWorks註冊擴充程式,操作步驟如下。

  1. 進入開放平台頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的更多 > 開放平台,進入開放平台的開發人員後台頁面。

  2. 註冊擴充程式。

    1. 在左側導覽列單擊擴充程式,進入擴充程式頁面。

    2. 單擊擴充程式列表 > 註冊擴充程式,選擇通過Function Compute部署並配置擴充程式資訊。

    您可根據需要配置所需參數,參數說明如下。

    參數說明

    參數

    如何配置

    擴充程式名稱

    自訂擴充程式的名稱,用於標識擴充程式。

    Function Compute服務及函數

    選擇擴充程式需部署在Function Compute的哪個服務及函數上。後續開發的擴充程式會將特定事件訊息直接下發至該服務。

    處理的擴充點

    目前僅支援處理資料下載前置事件資產上架\下架前置事件資料上傳前置事件觸發的訊息。

    說明

    選擇完成後,配置介面會自動根據選擇結果匹配所屬事件適用模組,無需手動設定。

    負責人

    擴充程式的負責人,方便擴充程式使用者遇到問題時能及時聯絡到負責人。

    測試用工作空間

    選擇測試擴充程式的工作空間。擴充程式無需進行上線操作,即可在測試工作空間中生效。

    擴充程式上線前,開發人員可在測試空間進行完整鏈路的測實驗證,通過觸發事件,測試DataWorks通過EventBridge發送訊息、擴充程式接收訊息並進行訊息審核與回調。

    說明

    處理的擴充點選擇租戶級擴充點事件,無需配置測試用工作空間

    擴充程式詳情地址

    輸入介紹擴充程式詳情的地址,協助擴充程式使用者更好地理解和使用此擴充程式。

    您可在開發部署擴充程式時,開發一個擴充程式的詳情展示頁面,將頁面地址配置在此處,以便使用者在觸發擴充程式校正時,可通過連結查看完整的校正過程。例如,此次擴充程式檢查鏈路和阻塞的原因。

    擴充程式文檔地址

    輸入擴充程式的協助文檔地址,供擴充程式的使用者閱讀。

    您可在開發部署擴充程式時,開發一個擴充程式的協助文檔頁面,將頁面地址配置在此處,以便使用者學習瞭解擴充程式的校正邏輯與屬性。

    擴充程式參數配置

    DataWorks支援在擴充程式開發過程中使用參數來提高擴充程式開發和應用效率,在擴充程式碼開發時,將需要應用的參數添加至此處即可。

    您可直接使用DataWorks提供的典型應用情境的內建參數,也可自訂參數。

    支援添加多個參數,一行一個參數,參數格式為key=value。更多參數的使用,請參見進階應用程式:擴充程式參數配置

    擴充程式選項配置

    輸入提供給擴充程式使用者使用的功能配置項,可實現該擴充程式在不同工作空間進行個人化管控。擴充程式開發人員需在此介面通過JSON字串定義選項。

    例如,可通過選項配置讓擴充程式使用者自行管控SQL長度。JSON格式可參考進階應用程式:擴充程式選項配置

  3. 完成擴充程式註冊。

    單擊確定,完成擴充程式註冊。註冊完成後,在擴充程式列表,查看此註冊成功的擴充程式。

步驟二:上線擴充程式

擴充程式開發、部署並在DataWorks上註冊完成後,您還需通過測試、審批、上線流程,之後擴充程式責任人之外的其他管理者可在管理中心中啟用該擴充程式,操作詳情請參見:應用擴充程式

附錄:DataWorks發送給Function Compute的訊息格式

以下為Function Compute方式各類型訊息通用格式,其中messageBody包含DataWorks事件訊息詳細資料,訊息類型不同,訊息內容也不同。

{
	"blockBusiness": true,
	"eventCategoryType": "resources-download",//事件類別目錄
	"eventType": "upload-data-to-table",//事件類型
	"extensionBizId": "job_6603643923728775070",
	"messageBody": {
             //訊息類型不同,訊息內容也不同。以下為訊息中固定的兩個欄位。
             "tenantId": 28378****10656,//租戶id,DataWorks每個阿里雲主帳號對應一個租戶,每個租戶有自己的租戶id,該值您可在DataWorks資料開發右上方使用者資訊查看。
             "eventCode": "xxxx"//
	},
	"messageId": "52d44ee7-b51f-4d4d-afeb-*******"//事件ID。標識事件的唯一值。
}
說明

messageBody欄位下因為訊息類型不同,訊息內容也不同,各事件訊息請參見:開發參考:事件列表與訊息格式

相關文檔