本文介紹了如何基於LHM調度遷移工具將使用者自行構造的DataWorks Spec匯入DataWorks。
一、DataWorks Spec的構造與解析
LHM遷移工具允許使用者按照DataWorks任務流標準結構(DataWorks Spec)自行構造描述檔案,並通過LHM匯入。作為匯入的前置動作,LHM將對DataWorks Spec進行解析,並產生遷移標準包。
產生遷移標準包後,無需進行進一步轉換,直接運行匯入工具向DataWorks寫入任務流。
1 DataWorks Spec定義
https://github.com/aliyun/dataworks-spec/tree/master
2 輸入包構建
按照DataWorks Spec定義,自行構造任務流定義並儲存為JSON檔案。
將所有任務流JSON檔案平鋪於檔案夾中,壓縮檔夾為zip格式。
{
"version": "1.1.0",
"kind": "CycleWorkflow",
"spec": {
"nodes": [],
"workflows": [
{
"id": "3451387436863448",
"outputs": {
"nodeOutputs": [
{
"artifactType": "NodeOutput",
"data": "3451387436863448",
"refTableName": "example_hive_operator"
}
]
},
"nodes": [
{
"id": "2105761738722077",
"name": "run_first",
"instanceMode": "T+1",
"rerunMode": "FailureAllowed",
"rerunTimes": 0,
"rerunInterval": 300000,
"trigger": {
"type": "Scheduler",
"startTime": "2025-03-30 00:00:00",
"timezone": "UTC"
},
"runtimeResource": {
"resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
},
"script": {
"path": "run_first",
"runtime": {
"command": "EMR_HIVE"
},
"parameters": [],
"content": "\n create database if not exists airflow;\n use airflow;\n drop table if exists test_hive;\n create table test_hive(name string);\n insert into test_hive values('studio');\n "
},
"outputs": {
"nodeOutputs": [
{
"data": "2105761738722077"
},
{
"artifactType": "NodeOutput",
"data": "example_hive_operator.run_first.run_second",
"refTableName": "run_first"
}
]
},
"type": "EMR_HIVE"
},
{
"id": "3326558787158921",
"name": "run_second",
"instanceMode": "T+1",
"rerunMode": "FailureAllowed",
"rerunTimes": 0,
"rerunInterval": 300000,
"trigger": {
"type": "Scheduler",
"startTime": "2025-03-30 00:00:00",
"timezone": "UTC"
},
"runtimeResource": {
"resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
},
"script": {
"path": "run_second",
"runtime": {
"command": "EMR_HIVE"
},
"parameters": [],
"content": "\n use airflow;\n add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;\n create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';\n show functions like '*udf';\n select simpleudf(name) from test_hive;\n "
},
"outputs": {
"nodeOutputs": [
{
"data": "3326558787158921"
}
]
},
"type": "EMR_HIVE"
}
],
"dependencies": [
{
"nodeId": "3326558787158921",
"depends": [
{
"type": "Normal",
"output": "2105761738722077",
"refTableName": "run_first"
}
]
}
],
"script": {
"path": "Airflow匯入_V3/example_hive_operator",
"runtime": {
"command": "WORKFLOW"
},
"parameters": []
},
"name": "example_hive_operator",
"trigger": {
"type": "Scheduler",
"cron": "0 0 0 * * ?",
"timezone": "UTC",
"delaySeconds": 0
},
"type": "CycleWorkflow",
"strategy": {
"timeout": 0,
"instanceMode": "T+1",
"rerunMode": "Allowed",
"rerunTimes": 0,
"rerunInterval": 0,
"failureStrategy": "Continue"
}
}
],
"flow": [
{
"nodeId": "3451387436863448",
"depends": []
}
]
}
}壓縮命令(參考)
zip -q -r -m -o <PackageName>.zip PackageName3 配置項構建
請使用如下配置項,內容無需更改。調度資訊由自訂DataWorks Spec描述,因此此處無需配置過多配置項。
{
"schedule_datasource": {
"name": "MySpec",
"type": "DataWorks",
"operaterType": "MANUAL"
},
"conf": {}
}4 運行調度解析工具
解析工具通過命令列調用,調用命令如下:
sh ./bin/run.sh read \
-c ./conf/<你的設定檔>.json \
-f ./data/0_OriginalPackage/<輸入包>.zip \
-o ./data/1_ReaderOutput/<遷移標準包>.zip \
-t dw-newidea-reader其中-c為設定檔路徑,-f為輸入包路徑,-o為遷移標準包的產生路徑,-t為探查外掛程式名稱。
例如,當前需要解析專案A:
sh ./bin/run.sh read \
-c ./conf/projectA_read.json \
-f ./data/0_OriginalPackage/projectA_DataworksSpec.zip \
-o ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-t dw-newidea-reader探查工具運行中將列印過程資訊,請關注運行過程中是否有報錯。
5 查看解析結果
開啟./data/1_ReaderOutput/下的產生包ReaderOutput.zip,可預覽解析結果。
其中,統計報表是對DataWorks Spec中任務流、節點、資源、函數、資料來源基本資料的匯總展示。
而data/project檔案夾下是對DataWorks Spec調度資訊資料結構標準化後的結果。
統計報表提供了兩項特殊能力:
1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。
2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!
二、匯入DataWorks
匯入工具支援多輪刷寫,會自動選擇建立/更新任務流(OverWrite模式)。
1 前置條件
1.1 DataWorks Spec解析成功
DataWorks Spec解析完成,ReaderOutput.zip被成功產生。
(可選,推薦)開啟解析輸出包,查看統計報表,核對待遷移範圍是否被解析成功。
1.2 DataWorks側配置
DataWorks側需進行以下動作:
1、建立工作空間。
2、建立AK、SK且保證AK、SK對工作空間具有管理員權限。(強烈建議建立與帳號有綁定關係的AK、SK,以便在寫入遇到問題時進行排查)
3、在工作空間中建立資料來源、綁定計算資源、建立資源群組。
4、在工作空間中上傳檔案資源、建立UDF。
1.3 網路連通性檢查
驗證能否串連DataWorks Endpoint。
服務存取點列表:
ping dataworks.aliyuncs.com2 匯入配置項
在工程目錄的conf檔案夾下建立匯出設定檔(JSON格式),如writer.json。
使用前請刪除json中的注釋。
{
"schedule_datasource": {
"name": "YourDataWorks", //給你的DataWorks資料來源起個名字!
"type": "DataWorks",
"properties": {
"endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服務存取點
"project_id": "YourProjectId", // 工作空間ID
"project_name": "YourProject", // 工作空間名稱
"ak": "************", // AK
"sk": "************", // SK
},
"operaterType": "MANUAL"
},
"conf": {
"di.resource.group.identifier": "Serverless_res_group_***_***", // 調度資源群組
"resource.group.identifier": "Serverless_res_group_***_***", // Data Integration資源群組
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks節點類型表的路徑
"qps.limit": 5 // 向DataWorks發送API請求的QPS上限
}
}2.1 服務存取點
根據DataWorks所在Region選擇服務存取點,參考文檔:
2.2 工作空間ID與名稱
開啟DataWorks控制台,開啟工作空間詳情頁,從右側基本資料中擷取工作空間ID與名稱。
2.3 建立AK、SK並授權
在使用者頁建立AK、SK,要求對目標DataWorks工作空間擁有管理員讀寫權限。
許可權管理組件括兩處,如果帳號是RAM帳號,則需先對RAM帳號進行DataWorks操作授權。
權限原則頁面:https://ram.console.alibabacloud.com/policies
然後在DataWorks工作空間中,將工作空間許可權賦給帳號。
注意!AccessKey可設定網路訪問限制策略,請務必保證遷移工具所在機器的IP被允許訪問。
2.4 資源群組
由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。
通用資源群組可用於節點調度,也可用於Data Integration。配置項中調度資源群組resource.group.identifier和Data Integration資源群組di.resource.group.identifier可以配置為同一通用資源群組。
2.5 QPS設定
工具通過調用DataWorks的API進行匯入操作。不同DataWorks版本中的讀、寫OpenAPI分別有相應的QPS限制和每日調用次數限制,詳見連結:使用限制。
DataWorks基礎版、標準版、專業版建議填寫"qps.limit": 5,企業版建議填寫"qps.limit": 20。
注意,請儘可能避免多個匯入工具同時運行。
2.6 DataWorks節點類型ID設定
在DataWorks中,部分節點類型在不同Region中被分配了不同的TypeId。具體TypeID以DataWorks資料開發實際介面為準。存在此特性的節點類型以資料庫節點為主:資料庫節點。
如:MySQL節點在杭州Region的NodeTypeId為1000039、在深圳Region的NodeTypeId為1000041。
為適應上述DataWorks不同Region的差異特性,工具提供了一種可配置的方式,允許使用者組態工具所使用的節點TypeId表。
表格通過匯入工具的配置項引入:
"conf": {
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks節點類型表的路徑
}從DataWorks資料開發介面上擷取節點類型Id的方法:在介面上建立一個工作流程,並在工作流程中建立一個節點,在點擊儲存後查看工作流程的Spec。
若節點類型配置錯誤,在任務流發布時將提示以下錯誤。
3 運行DataWorks匯入工具
轉換工具通過命令列調用,調用命令如下:
sh ./bin/run.sh write \
-c ./conf/<你的設定檔>.json \
-f ./data/1_ReaderOutput/<解析結果輸出包>.zip \
-o ./data/4_WriterOutput/<匯入結果儲存包>.zip \
-t dw-newide-writer其中-c為設定檔路徑,-f為ReaderOutput包儲存路徑,-o為WriterOutput包儲存路徑,-t為提交外掛程式名稱。
例如,當前需要匯入DataWorks的專案A:
sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer匯入工具運行中將列印過程資訊,請關注運行過程中是否有報錯。匯入完成後將在命令列中列印匯入成功與失敗的統計資訊。注意,部分節點的匯入失敗不會影響整體匯入流程,如遇少量節點匯入失敗,可在DataWorks中進行手動修改。
4 查看匯入結果
匯入完成後,可在DataWorks中查看匯入結果。匯入處理程序中亦可查看工作流程逐個匯入的過程,如發現問題需要終止匯入,可運行jps命令找到BwmClientApp,並使用kill -9終止匯入。
5 Q&A
5.1 源端持續在進行開發,這些增量與變更如何提交到DataWorks?
遷移工具為OverWrite模式,重新運行匯出、轉換、匯入可實現將源端增量提交到DataWorks的能力。請注意,工具將根據全路徑匹配任務流以選擇建立任務流/更新任務流。如需進行變更遷移,請勿移動任務流。
5.2 源端持續在進行開發,同時進行DataWorks上任務流改造與治理,增量遷移時是否會覆蓋DataWorks上的變更?
是的,遷移工具為OverWrite模式,建議您在完成遷移後再在DataWorks上進行後續改造。或者採用分批遷移的方式,已遷移等任務流再確認不再刷寫後開始DataWorks改造,不同批次之間互相不會影響。
5.3 整個包匯入耗時太長,能否只匯入一部分
可以,可手動裁剪待匯入包來實現部分匯入:將data/project/workflow檔案夾下需要匯入的任務流保留、其他任務流刪除,重新壓縮回壓縮包,再運行匯入工具。注意,存在相互依賴的任務流需要捆綁匯入,否則任務流間的節點血緣將會丟失。