全部產品
Search
文件中心

Migration Hub:AzureDataFactory(ADF) - DataWorks

更新時間:Dec 18, 2025

本文介紹了基於LHM調度遷移工具將Azure Data Factory調度任務流遷移到DataWorks的方案與操作流程,包括三步,Azure Data Factory任務匯出、調度任務轉換、DataWorks任務匯入。

一、環境準備

1.1 運行環境準備

序號

規格

數量

備忘

1

ECS

4c16g以上

1

鏡像CentOS、AliyunOS均可

2

jdk17

3

運行工具包

下載連結

1.2 網路準備

  • ECS對應的VPC需要能訪問公網

1.3 帳號許可權準備

1.3.1 採用服務註冊認證的方式

  • 註冊應用程式

  • 擷取用戶端密鑰

1.3.2 配置Azure Data Factory許可權

1.3.3. 配置blob許可權

非必需,如果要讀取節點中blob中存在的檔案內容,需要使用ADF服務註冊授權同樣的方式來對blob進行授權

1.3.4 配置Databricks許可權

非必需,主要用來讀取Azure Data Factory 中關於Databricks節點具體外置指令檔

  1. 進入dbr工作台

  2. 點擊使用者佈建

  3. 進入左側開發人員介面

  4. 點擊產生新令牌

二、AzureDataFactory任務匯出

採用SDK方式匯出,依賴項如下

<dependency>
      <groupId>com.azure.resourcemanager</groupId>
      <artifactId>azure-resourcemanager-datafactory</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-identity</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-storage-blob</artifactId>
    </dependency>
    <dependency>
      <groupId>com.databricks</groupId>
      <artifactId>databricks-sdk-java</artifactId>
    </dependency>

2.1 設定檔

{
  "schedule_datasource": {
    "name": "name",
    "type": "adf",
    "properties": {
      "isUseProxy": false,
      "proxyHost": "proxy.msl.cn",
      "proxyPort": "8080",
      "AzureCloud": "AZURE_CHINA_CLOUD",
      "apiMode": "sdk"
      "endpoint": "https://management.azure.com",
      "subscriptionId": "xxx",
      "factory": "bigdata-adf-jiman",
      "project": "bigdata-adf-jiman",
      "resourceGroupName": "biadata",
      "tenantId": "xxx",
      "clientId": "xxx",
      "clientSecretValue": "xxxx",
      "dbr_endpoint": "https://adb-xxxxx.16.azuredatabricks.net",
      "dbr_token": "xxxxx",
      "pipelineNameWhite": "dbr-demo"
    },
    "operaterType": "AUTO"
  }
}

2.2 配置項說明

序號

參數名

是否必填

範例值

備忘

1

isUseProxy

false

當使用代理模式來訪問Azure時必需

2

proxyHost

proxy.msl.cn

當使用代理模式來訪問Azure時必需

3

proxyPort

8080

當使用代理模式來訪問Azure時必需

4

AzureCloud

AZURE_CHINA_CLOUD

可選項如下:

分別對應不同的地區,預設為AZURE_CHINA_CLOUD

  1. AZURE_PUBLIC_CLOUD

  2. AZURE_CHINA_CLOUD

  3. AZURE_US_GOVERNMENT_CLOUD

5

endpoint

https://portal.azure.com

跟AzureCloud綁定,可不填

6

subscriptionId

xxxxx

訂閱ID,可以在adf對應的首頁概覽擷取

7

resourceGroupName

xxxxx

資源群組名字,可以在adf對應的首頁概覽擷取

8

factory

bigdata-adf-jiman

主要用來做標識

9

project

bigdata-adf-jiman

和factory保持一致即可

10

tenantId

xxxxx

在服務註冊步驟中可以擷取到

11

clientId

xxxxx

在服務註冊步驟中可以擷取到

12

clientSecretValue

xxxxx

在服務註冊步驟中可以擷取到

13

dbr_endpoint

xxxxx

在databricks首頁擷取

14

dbr_token

xxxxx

在databricks首頁擷取

15

pipelineNameWhite

dbr-demo

白名單,多個pipeline名稱用,分割

2.3 執行匯出命令

mkdir result
sh ./bin/run.sh read \
-c ./conf/<你的設定檔>.json \
-o ./data/1_ReaderOutput/<源端探查匯出包>.zip \
-t adf-reader

命令參數說明

序號

參數名

是否必填

範例參數值

備忘

1

-c

./conf/<你的設定檔>.json

設定檔路徑

2

-o

./data/1_ReaderOutput/<源端探查匯出包>.zip

輸出路徑

3

-t

adf-reader

外掛程式類型(固定值)

完整命令範例

mkdir result
sh ./bin/run.sh read -c ./conf/read.json -f ./result/temp.zip -o ./result/read_out.zip  -t adf-reader

2.4 查看匯出結果

開啟./data/1_ReaderOutput/下的產生包ReaderOutput.zip,可預覽匯出結果。其中,統計報表是任務流、節點、資源、函數、資料來源基本資料的匯總展示,而data/project檔案夾下是對調度資訊資料結構標準化後的結果。

統計報表提供了兩項特殊能力:

1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。

2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!

詳見:使用調度遷移中的概覽報表補充修改調度屬性

三、調度任務轉換

3.1 設定檔

{
  "conf": {
    "locale": "zh_CN"
  },
  "self": {
    "if.use.default.convert": false,
    "if.use.migrationx.before": false,
    "if.use.dataworks.newidea": true,
    "filter.rule": []
  },
  "schedule_datasource": {
    "name": "adf",
    "type": "Adf"
  },
  "target_schedule_datasource": {
    "name": "name",
    "type": "DataWorks"
  }
}

3.2 配置項說明

目前預設即可

序號

參數名

是否必填

範例值

備忘

1

filter.rule

[

{

"type": "black",

"element": "node",

"field": "name",

"value": "DataXNode"

}

]

過濾規則,可不填。

type標識黑白名單,可選項: BLACK| WHITE

element標識篩選元素類型,支援workflow和node,可選項:NODE| WORKFLOW

field標識按照名稱或者按照id來篩選,可選項:ID|NAME

value標識具體的值,多個可用,分割

schedule_datasource.name標識資料來源,對應的是所有workflow的根目錄

3.3 節點內建轉換映射邏輯

3.3.1 常規節點映射邏輯

ADF節點類型

DW節點類型

備忘

Copy

DI

目前僅有原始的json指令碼,di具體的內容跟資料來源類型相關,需要拿到所有di相關的資料來源類型才能想辦法轉換

Delete

DIDE_SHELL

拼接了刪除檔案路徑的shell

SqlServerStoredProcedure

ODPS-SQL

預設為odps節點,如果判斷邏輯是sqlserver,則節點類型為Sql Server

DatabricksSparkJar

ODPS-SPARK

用已經有的參數轉換了odps spark節點,可能存在遺漏

DatabricksSparkPython

ODPS-SPARK

用已經有的參數轉換了odps spark節點,可能存在遺漏

DatabricksNotebook

NOTEBOOK

scala相關的cell未做處理

SynapseNotebook

NOTEBOOK

scala相關的cell未做處理

SparkJob

ODPS-SPARK

暫時轉為虛擬節點

AppendVariable

CONTROLLER_ASSIGNMENT

賦值節點

ExecutePipeline

SUB_PROCESS

Script

ODPS_SQL

Wait

DIDE_SHELL

WebActivity

DIDE_SHELL

IfCondition

CONTROLLER_BRANCH

轉為分支+subprocess+歸併

FOREACH

CONTROLLER_TRAVERSE

Foreach

Switch

CONTROLLER_BRANCH

轉為分支+subprocess+歸併

Until

CONTROLLER_CYCLE

do-while節點

Lookup

CONTROLLER_ASSIGNMENT

Filter

CONTROLLER_ASSIGNMENT

GetMeta

CONTROLLER_ASSIGNMENT

SetVariable

CONTROLLER_ASSIGNMENT

HDInsightHive

ODPS-SQL

HDInsightSpark

ODPS_Spark

HDInsightMapReduce

ODPS_MR

其他

VIRTUAL

備忘:目前資源檔需要手工上傳

3.3.2 邏輯節點映射邏輯

  • ifcondition

  • foreach

  • switch

3.3.3 執行轉換命令

sh ./bin/run.sh convert \
-c ./conf/<你的設定檔>.json \
-f ./data/1_ReaderOutput/<源端探查匯出包>.zip \
-o ./data/2_ConverterOutput/<轉換結果輸出包>.zip \
-t wedata-dw-converter

序號

參數名

是否必填

範例參數值

備忘

1

-c

./conf/<你的設定檔>.json

設定檔路徑

2

-f

./data/1_ReaderOutput/<源端探查匯出包>.zip

探查匯出包

3

-o

./result/convert_out.zip

輸出包路徑

4

-t

adf-dw-converter

外掛程式類型(固定值)

3.3.4 查看轉換結果

開啟./data/2_ConverterOutput/下的產生包ConverterOutput.zip,可預覽匯出結果。

其中,統計報表是對轉換結果任務流、節點、資源、函數、資料來源基本資料的匯總展示。

而data/project檔案夾是轉換完成的調度遷移包本體。

統計報表提供了兩項特殊能力:

1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。

2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!

詳見:使用調度遷移中的概覽報表補充修改調度屬性

四、Dataworks任務匯入

LHM遷移工具異構轉換已將遷移源端的調度元素轉換為DataWorks調度格式,工具得以針對不同的遷移情境提供了統一的上傳入口,實現任務流匯入DataWorks。

匯入工具支援多輪刷寫,會自動選擇建立/更新任務流(OverWrite模式)。

1 前置條件

1.1 轉換成功

轉換工具運行完成,源端調度資訊被成功轉換為DataWorks調度資訊,ConverterOutput.zip被成功產生。

(可選,推薦)開啟轉換輸出包,查看統計報表,核對待遷移範圍是否被轉換成功。

1.2 DataWorks側配置

DataWorks側需進行以下動作:

1、建立工作空間。

2、建立AK、SK且保證AK、SK對工作空間具有管理員權限。(強烈建議建立與帳號有綁定關係的AK、SK,以便在寫入遇到問題時進行排查)

3、在工作空間中建立資料來源、綁定計算資源、建立資源群組。

4、在工作空間中上傳檔案資源、建立UDF。

1.3 網路連通性檢查

驗證能否串連DataWorks Endpoint。

服務存取點列表:

服務存取點

ping dataworks.aliyuncs.com

2 匯入配置項

在工程目錄的conf檔案夾下建立匯出設定檔(JSON格式),如writer.json。

  • 使用前請刪除json中的注釋。

{
  "schedule_datasource": {
    "name": "YourDataWorks", //給你的DataWorks資料來源起個名字!
    "type": "DataWorks",
    "properties": {
      "endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服務存取點
      "project_id": "YourProjectId", // 工作空間ID
      "project_name": "YourProject", // 工作空間名稱
      "ak": "************", // AK
      "sk": "************", // SK
    },
    "operaterType": "MANUAL"
  },
  "conf": {
    "di.resource.group.identifier": "Serverless_res_group_***_***", // 調度資源群組
    "resource.group.identifier": "Serverless_res_group_***_***", // Data Integration資源群組
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks節點類型表的路徑
    "qps.limit": 5 // 向DataWorks發送API請求的QPS上限
  }
}

2.1 服務存取點

根據DataWorks所在Region選擇服務存取點,參考文檔:

服務存取點

2.2 工作空間ID與名稱

開啟DataWorks控制台,開啟工作空間詳情頁,從右側基本資料中擷取工作空間ID與名稱。

2.3 建立AK、SK並授權

在使用者頁建立AK、SK,要求對目標DataWorks工作空間擁有管理員讀寫權限。

許可權管理組件括兩處,如果帳號是RAM帳號,則需先對RAM帳號進行DataWorks操作授權。

權限原則頁面:https://ram.console.alibabacloud.com/policies

然後在DataWorks工作空間中,將工作空間許可權賦給帳號。

注意!AccessKey可設定網路訪問限制策略,請務必保證遷移工具所在機器的IP被允許訪問。

2.4 資源群組

由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。

通用資源群組可用於節點調度,也可用於Data Integration。配置項中調度資源群組resource.group.identifier和Data Integration資源群組di.resource.group.identifier可以配置為同一通用資源群組。

2.5 QPS設定

工具通過調用DataWorks的API進行匯入操作。不同DataWorks版本中的讀、寫OpenAPI分別有相應的QPS限制和每日調用次數限制,詳見連結:使用限制

DataWorks基礎版、標準版、專業版建議填寫"qps.limit": 5,企業版建議填寫"qps.limit": 20。

注意,請儘可能避免多個匯入工具同時運行。

2.6 DataWorks節點類型ID設定

在DataWorks中,部分節點類型在不同Region中被分配了不同的TypeId。具體TypeID以DataWorks資料開發實際介面為準。存在此特性的節點類型以資料庫節點為主:資料庫節點

如:MySQL節點在杭州Region的NodeTypeId為1000039、在深圳Region的NodeTypeId為1000041。

為適應上述DataWorks不同Region的差異特性,工具提供了一種可配置的方式,允許使用者組態工具所使用的節點TypeId表。

表格通過匯入工具的配置項引入:

"conf": {
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks節點類型表的路徑
 }

從DataWorks資料開發介面上擷取節點類型Id的方法:在介面上建立一個工作流程,並在工作流程中建立一個節點,在點擊儲存後查看工作流程的Spec。

若節點類型配置錯誤,在任務流發布時將提示以下錯誤。

3 運行DataWorks匯入工具

轉換工具通過命令列調用,調用命令如下:

sh ./bin/run.sh write \
-c ./conf/<你的設定檔>.json \
-f ./data/2_ConverterOutput/<轉換結果輸出包>.zip \
-o ./data/4_WriterOutput/<匯入結果儲存包>.zip \
-t dw-newide-writer

其中-c為設定檔路徑,-f為ConverterOutput包儲存路徑,-o為WriterOutput包儲存路徑,-t為提交外掛程式名稱。

例如,當前需要匯入DataWorks的專案A:

sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/2_ConverterOutput/projectA_ConverterOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer

匯入工具運行中將列印過程資訊,請關注運行過程中是否有報錯。匯入完成後將在命令列中列印匯入成功與失敗的統計資訊。注意,部分節點的匯入失敗不會影響整體匯入流程,如遇少量節點匯入失敗,可在DataWorks中進行手動修改。

4 查看匯入結果

匯入完成後,可在DataWorks中查看匯入結果。匯入處理程序中亦可查看工作流程逐個匯入的過程,如發現問題需要終止匯入,可運行jps命令找到BwmClientApp,並使用kill -9終止匯入。

5 Q&A

5.1 源端持續在進行開發,這些增量與變更如何提交到DataWorks?

遷移工具為OverWrite模式,重新運行匯出、轉換、匯入可實現將源端增量提交到DataWorks的能力。請注意,工具將根據全路徑匹配任務流以選擇建立任務流/更新任務流。如需進行變更遷移,請勿移動任務流。

5.2 源端持續在進行開發,同時進行DataWorks上任務流改造與治理,增量遷移時是否會覆蓋DataWorks上的變更?

是的,遷移工具為OverWrite模式,建議您在完成遷移後再在DataWorks上進行後續改造。或者採用分批遷移的方式,已遷移等任務流再確認不再刷寫後開始DataWorks改造,不同批次之間互相不會影響。

5.3 整個包匯入耗時太長,能否只匯入一部分

可以,可手動裁剪待匯入包來實現部分匯入:將data/project/workflow檔案夾下需要匯入的任務流保留、其他任務流刪除,重新壓縮回壓縮包,再運行匯入工具。注意,存在相互依賴的任務流需要捆綁匯入,否則任務流間的節點血緣將會丟失。