本文介紹了基於LHM調度遷移工具將Azure Data Factory調度任務流遷移到DataWorks的方案與操作流程,包括三步,Azure Data Factory任務匯出、調度任務轉換、DataWorks任務匯入。
一、環境準備
1.1 運行環境準備
序號 | 項 | 規格 | 數量 | 備忘 |
1 | ECS | 4c16g以上 | 1 | 鏡像CentOS、AliyunOS均可 |
2 | jdk17 | |||
3 | 運行工具包 |
1.2 網路準備
ECS對應的VPC需要能訪問公網
1.3 帳號許可權準備
1.3.1 採用服務註冊認證的方式
註冊應用程式
擷取用戶端密鑰
1.3.2 配置Azure Data Factory許可權
1.3.3. 配置blob許可權
非必需,如果要讀取節點中blob中存在的檔案內容,需要使用ADF服務註冊授權同樣的方式來對blob進行授權
1.3.4 配置Databricks許可權
非必需,主要用來讀取Azure Data Factory 中關於Databricks節點具體外置指令檔
進入dbr工作台
點擊使用者佈建
進入左側開發人員介面
點擊產生新令牌
二、AzureDataFactory任務匯出
採用SDK方式匯出,依賴項如下
<dependency>
<groupId>com.azure.resourcemanager</groupId>
<artifactId>azure-resourcemanager-datafactory</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-sdk-java</artifactId>
</dependency>2.1 設定檔
{
"schedule_datasource": {
"name": "name",
"type": "adf",
"properties": {
"isUseProxy": false,
"proxyHost": "proxy.msl.cn",
"proxyPort": "8080",
"AzureCloud": "AZURE_CHINA_CLOUD",
"apiMode": "sdk"
"endpoint": "https://management.azure.com",
"subscriptionId": "xxx",
"factory": "bigdata-adf-jiman",
"project": "bigdata-adf-jiman",
"resourceGroupName": "biadata",
"tenantId": "xxx",
"clientId": "xxx",
"clientSecretValue": "xxxx",
"dbr_endpoint": "https://adb-xxxxx.16.azuredatabricks.net",
"dbr_token": "xxxxx",
"pipelineNameWhite": "dbr-demo"
},
"operaterType": "AUTO"
}
}2.2 配置項說明
序號 | 參數名 | 是否必填 | 範例值 | 備忘 |
1 | isUseProxy | 否 | false | 當使用代理模式來訪問Azure時必需 |
2 | proxyHost | 否 | proxy.msl.cn | 當使用代理模式來訪問Azure時必需 |
3 | proxyPort | 否 | 8080 | 當使用代理模式來訪問Azure時必需 |
4 | AzureCloud | 否 | AZURE_CHINA_CLOUD | 可選項如下: 分別對應不同的地區,預設為AZURE_CHINA_CLOUD
|
5 | endpoint | 否 | https://portal.azure.com | 跟AzureCloud綁定,可不填 |
6 | subscriptionId | 是 | xxxxx | 訂閱ID,可以在adf對應的首頁概覽擷取 |
7 | resourceGroupName | 是 | xxxxx | 資源群組名字,可以在adf對應的首頁概覽擷取 |
8 | factory | 是 | bigdata-adf-jiman | 主要用來做標識 |
9 | project | 否 | bigdata-adf-jiman | 和factory保持一致即可 |
10 | tenantId | 是 | xxxxx | 在服務註冊步驟中可以擷取到 |
11 | clientId | 是 | xxxxx | 在服務註冊步驟中可以擷取到 |
12 | clientSecretValue | 是 | xxxxx | 在服務註冊步驟中可以擷取到 |
13 | dbr_endpoint | 否 | xxxxx | 在databricks首頁擷取 |
14 | dbr_token | 否 | xxxxx | 在databricks首頁擷取 |
15 | pipelineNameWhite | 否 | dbr-demo | 白名單,多個pipeline名稱用,分割 |
2.3 執行匯出命令
mkdir result
sh ./bin/run.sh read \
-c ./conf/<你的設定檔>.json \
-o ./data/1_ReaderOutput/<源端探查匯出包>.zip \
-t adf-reader命令參數說明
序號 | 參數名 | 是否必填 | 範例參數值 | 備忘 |
1 | -c | 是 | ./conf/<你的設定檔>.json | 設定檔路徑 |
2 | -o | 是 | ./data/1_ReaderOutput/<源端探查匯出包>.zip | 輸出路徑 |
3 | -t | 是 | adf-reader | 外掛程式類型(固定值) |
完整命令範例
mkdir result
sh ./bin/run.sh read -c ./conf/read.json -f ./result/temp.zip -o ./result/read_out.zip -t adf-reader2.4 查看匯出結果
開啟./data/1_ReaderOutput/下的產生包ReaderOutput.zip,可預覽匯出結果。其中,統計報表是任務流、節點、資源、函數、資料來源基本資料的匯總展示,而data/project檔案夾下是對調度資訊資料結構標準化後的結果。
統計報表提供了兩項特殊能力:
1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。
2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!
三、調度任務轉換
3.1 設定檔
{
"conf": {
"locale": "zh_CN"
},
"self": {
"if.use.default.convert": false,
"if.use.migrationx.before": false,
"if.use.dataworks.newidea": true,
"filter.rule": []
},
"schedule_datasource": {
"name": "adf",
"type": "Adf"
},
"target_schedule_datasource": {
"name": "name",
"type": "DataWorks"
}
}3.2 配置項說明
目前預設即可
序號 | 參數名 | 是否必填 | 範例值 | 備忘 |
1 | filter.rule | 否 | [ { "type": "black", "element": "node", "field": "name", "value": "DataXNode" } ] | 過濾規則,可不填。 type標識黑白名單,可選項: BLACK| WHITE element標識篩選元素類型,支援workflow和node,可選項:NODE| WORKFLOW field標識按照名稱或者按照id來篩選,可選項:ID|NAME value標識具體的值,多個可用,分割 |
schedule_datasource.name標識資料來源,對應的是所有workflow的根目錄
3.3 節點內建轉換映射邏輯
3.3.1 常規節點映射邏輯
ADF節點類型 | DW節點類型 | 備忘 |
Copy | DI | 目前僅有原始的json指令碼,di具體的內容跟資料來源類型相關,需要拿到所有di相關的資料來源類型才能想辦法轉換 |
Delete | DIDE_SHELL | 拼接了刪除檔案路徑的shell |
SqlServerStoredProcedure | ODPS-SQL | 預設為odps節點,如果判斷邏輯是sqlserver,則節點類型為Sql Server |
DatabricksSparkJar | ODPS-SPARK | 用已經有的參數轉換了odps spark節點,可能存在遺漏 |
DatabricksSparkPython | ODPS-SPARK | 用已經有的參數轉換了odps spark節點,可能存在遺漏 |
DatabricksNotebook | NOTEBOOK | scala相關的cell未做處理 |
SynapseNotebook | NOTEBOOK | scala相關的cell未做處理 |
SparkJob | ODPS-SPARK | 暫時轉為虛擬節點 |
AppendVariable | CONTROLLER_ASSIGNMENT | 賦值節點 |
ExecutePipeline | SUB_PROCESS | |
Script | ODPS_SQL | |
Wait | DIDE_SHELL | |
WebActivity | DIDE_SHELL | |
IfCondition | CONTROLLER_BRANCH | 轉為分支+subprocess+歸併 |
FOREACH | CONTROLLER_TRAVERSE | Foreach |
Switch | CONTROLLER_BRANCH | 轉為分支+subprocess+歸併 |
Until | CONTROLLER_CYCLE | do-while節點 |
Lookup | CONTROLLER_ASSIGNMENT | |
Filter | CONTROLLER_ASSIGNMENT | |
GetMeta | CONTROLLER_ASSIGNMENT | |
SetVariable | CONTROLLER_ASSIGNMENT | |
HDInsightHive | ODPS-SQL | |
HDInsightSpark | ODPS_Spark | |
HDInsightMapReduce | ODPS_MR | |
其他 | VIRTUAL |
備忘:目前資源檔需要手工上傳
3.3.2 邏輯節點映射邏輯
ifcondition
foreach
switch
3.3.3 執行轉換命令
sh ./bin/run.sh convert \
-c ./conf/<你的設定檔>.json \
-f ./data/1_ReaderOutput/<源端探查匯出包>.zip \
-o ./data/2_ConverterOutput/<轉換結果輸出包>.zip \
-t wedata-dw-converter序號 | 參數名 | 是否必填 | 範例參數值 | 備忘 |
1 | -c | 是 | ./conf/<你的設定檔>.json | 設定檔路徑 |
2 | -f | 是 | ./data/1_ReaderOutput/<源端探查匯出包>.zip | 探查匯出包 |
3 | -o | 是 | ./result/convert_out.zip | 輸出包路徑 |
4 | -t | 是 | adf-dw-converter | 外掛程式類型(固定值) |
3.3.4 查看轉換結果
開啟./data/2_ConverterOutput/下的產生包ConverterOutput.zip,可預覽匯出結果。
其中,統計報表是對轉換結果任務流、節點、資源、函數、資料來源基本資料的匯總展示。
而data/project檔案夾是轉換完成的調度遷移包本體。
統計報表提供了兩項特殊能力:
1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。
2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!
四、Dataworks任務匯入
LHM遷移工具異構轉換已將遷移源端的調度元素轉換為DataWorks調度格式,工具得以針對不同的遷移情境提供了統一的上傳入口,實現任務流匯入DataWorks。
匯入工具支援多輪刷寫,會自動選擇建立/更新任務流(OverWrite模式)。
1 前置條件
1.1 轉換成功
轉換工具運行完成,源端調度資訊被成功轉換為DataWorks調度資訊,ConverterOutput.zip被成功產生。
(可選,推薦)開啟轉換輸出包,查看統計報表,核對待遷移範圍是否被轉換成功。
1.2 DataWorks側配置
DataWorks側需進行以下動作:
1、建立工作空間。
2、建立AK、SK且保證AK、SK對工作空間具有管理員權限。(強烈建議建立與帳號有綁定關係的AK、SK,以便在寫入遇到問題時進行排查)
3、在工作空間中建立資料來源、綁定計算資源、建立資源群組。
4、在工作空間中上傳檔案資源、建立UDF。
1.3 網路連通性檢查
驗證能否串連DataWorks Endpoint。
服務存取點列表:
ping dataworks.aliyuncs.com2 匯入配置項
在工程目錄的conf檔案夾下建立匯出設定檔(JSON格式),如writer.json。
使用前請刪除json中的注釋。
{
"schedule_datasource": {
"name": "YourDataWorks", //給你的DataWorks資料來源起個名字!
"type": "DataWorks",
"properties": {
"endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服務存取點
"project_id": "YourProjectId", // 工作空間ID
"project_name": "YourProject", // 工作空間名稱
"ak": "************", // AK
"sk": "************", // SK
},
"operaterType": "MANUAL"
},
"conf": {
"di.resource.group.identifier": "Serverless_res_group_***_***", // 調度資源群組
"resource.group.identifier": "Serverless_res_group_***_***", // Data Integration資源群組
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks節點類型表的路徑
"qps.limit": 5 // 向DataWorks發送API請求的QPS上限
}
}2.1 服務存取點
根據DataWorks所在Region選擇服務存取點,參考文檔:
2.2 工作空間ID與名稱
開啟DataWorks控制台,開啟工作空間詳情頁,從右側基本資料中擷取工作空間ID與名稱。
2.3 建立AK、SK並授權
在使用者頁建立AK、SK,要求對目標DataWorks工作空間擁有管理員讀寫權限。
許可權管理組件括兩處,如果帳號是RAM帳號,則需先對RAM帳號進行DataWorks操作授權。
權限原則頁面:https://ram.console.alibabacloud.com/policies
然後在DataWorks工作空間中,將工作空間許可權賦給帳號。
注意!AccessKey可設定網路訪問限制策略,請務必保證遷移工具所在機器的IP被允許訪問。
2.4 資源群組
由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。
通用資源群組可用於節點調度,也可用於Data Integration。配置項中調度資源群組resource.group.identifier和Data Integration資源群組di.resource.group.identifier可以配置為同一通用資源群組。
2.5 QPS設定
工具通過調用DataWorks的API進行匯入操作。不同DataWorks版本中的讀、寫OpenAPI分別有相應的QPS限制和每日調用次數限制,詳見連結:使用限制。
DataWorks基礎版、標準版、專業版建議填寫"qps.limit": 5,企業版建議填寫"qps.limit": 20。
注意,請儘可能避免多個匯入工具同時運行。
2.6 DataWorks節點類型ID設定
在DataWorks中,部分節點類型在不同Region中被分配了不同的TypeId。具體TypeID以DataWorks資料開發實際介面為準。存在此特性的節點類型以資料庫節點為主:資料庫節點。
如:MySQL節點在杭州Region的NodeTypeId為1000039、在深圳Region的NodeTypeId為1000041。
為適應上述DataWorks不同Region的差異特性,工具提供了一種可配置的方式,允許使用者組態工具所使用的節點TypeId表。
表格通過匯入工具的配置項引入:
"conf": {
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks節點類型表的路徑
}從DataWorks資料開發介面上擷取節點類型Id的方法:在介面上建立一個工作流程,並在工作流程中建立一個節點,在點擊儲存後查看工作流程的Spec。
若節點類型配置錯誤,在任務流發布時將提示以下錯誤。
3 運行DataWorks匯入工具
轉換工具通過命令列調用,調用命令如下:
sh ./bin/run.sh write \
-c ./conf/<你的設定檔>.json \
-f ./data/2_ConverterOutput/<轉換結果輸出包>.zip \
-o ./data/4_WriterOutput/<匯入結果儲存包>.zip \
-t dw-newide-writer其中-c為設定檔路徑,-f為ConverterOutput包儲存路徑,-o為WriterOutput包儲存路徑,-t為提交外掛程式名稱。
例如,當前需要匯入DataWorks的專案A:
sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/2_ConverterOutput/projectA_ConverterOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer匯入工具運行中將列印過程資訊,請關注運行過程中是否有報錯。匯入完成後將在命令列中列印匯入成功與失敗的統計資訊。注意,部分節點的匯入失敗不會影響整體匯入流程,如遇少量節點匯入失敗,可在DataWorks中進行手動修改。
4 查看匯入結果
匯入完成後,可在DataWorks中查看匯入結果。匯入處理程序中亦可查看工作流程逐個匯入的過程,如發現問題需要終止匯入,可運行jps命令找到BwmClientApp,並使用kill -9終止匯入。
5 Q&A
5.1 源端持續在進行開發,這些增量與變更如何提交到DataWorks?
遷移工具為OverWrite模式,重新運行匯出、轉換、匯入可實現將源端增量提交到DataWorks的能力。請注意,工具將根據全路徑匹配任務流以選擇建立任務流/更新任務流。如需進行變更遷移,請勿移動任務流。
5.2 源端持續在進行開發,同時進行DataWorks上任務流改造與治理,增量遷移時是否會覆蓋DataWorks上的變更?
是的,遷移工具為OverWrite模式,建議您在完成遷移後再在DataWorks上進行後續改造。或者採用分批遷移的方式,已遷移等任務流再確認不再刷寫後開始DataWorks改造,不同批次之間互相不會影響。
5.3 整個包匯入耗時太長,能否只匯入一部分
可以,可手動裁剪待匯入包來實現部分匯入:將data/project/workflow檔案夾下需要匯入的任務流保留、其他任務流刪除,重新壓縮回壓縮包,再運行匯入工具。注意,存在相互依賴的任務流需要捆綁匯入,否則任務流間的節點血緣將會丟失。