全部產品
Search
文件中心

Migration Hub:自訂DataWorks Spec匯入DataWorks

更新時間:Sep 02, 2025

本文介紹了如何基於LHM調度遷移工具將使用者自行構造的DataWorks Spec匯入DataWorks。

一、DataWorks Spec的構造與解析

LHM遷移工具允許使用者按照DataWorks任務流標準結構(DataWorks Spec)自行構造描述檔案,並通過LHM匯入。作為匯入的前置動作,LHM將對DataWorks Spec進行解析,並產生遷移標準包。

產生遷移標準包後,無需進行進一步轉換,直接運行匯入工具向DataWorks寫入任務流。

1 DataWorks Spec定義

https://github.com/aliyun/dataworks-spec/tree/master

2 輸入包構建

  1. 按照DataWorks Spec定義,自行構造任務流定義並儲存為JSON檔案。

  2. 將所有任務流JSON檔案平鋪於檔案夾中,壓縮檔夾為zip格式。

{
  "version": "1.1.0",
  "kind": "CycleWorkflow",
  "spec": {
    "nodes": [],
    "workflows": [
      {
        "id": "3451387436863448",
        "outputs": {
          "nodeOutputs": [
            {
              "artifactType": "NodeOutput",
              "data": "3451387436863448",
              "refTableName": "example_hive_operator"
            }
          ]
        },
        "nodes": [
          {
            "id": "2105761738722077",
            "name": "run_first",
            "instanceMode": "T+1",
            "rerunMode": "FailureAllowed",
            "rerunTimes": 0,
            "rerunInterval": 300000,
            "trigger": {
              "type": "Scheduler",
              "startTime": "2025-03-30 00:00:00",
              "timezone": "UTC"
            },
            "runtimeResource": {
              "resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
            },
            "script": {
              "path": "run_first",
              "runtime": {
                "command": "EMR_HIVE"
              },
              "parameters": [],
              "content": "\n    create database if not exists airflow;\n    use airflow;\n    drop table if exists test_hive;\n    create table test_hive(name string);\n    insert into test_hive values('studio');\n    "
            },
            "outputs": {
              "nodeOutputs": [
                {
                  "data": "2105761738722077"
                },
                {
                  "artifactType": "NodeOutput",
                  "data": "example_hive_operator.run_first.run_second",
                  "refTableName": "run_first"
                }
              ]
            },
            "type": "EMR_HIVE"
          },
          {
            "id": "3326558787158921",
            "name": "run_second",
            "instanceMode": "T+1",
            "rerunMode": "FailureAllowed",
            "rerunTimes": 0,
            "rerunInterval": 300000,
            "trigger": {
              "type": "Scheduler",
              "startTime": "2025-03-30 00:00:00",
              "timezone": "UTC"
            },
            "runtimeResource": {
              "resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
            },
            "script": {
              "path": "run_second",
              "runtime": {
                "command": "EMR_HIVE"
              },
              "parameters": [],
              "content": "\n    use airflow;\n    add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;\n    create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';\n    show functions like '*udf';\n    select simpleudf(name) from test_hive;\n    "
            },
            "outputs": {
              "nodeOutputs": [
                {
                  "data": "3326558787158921"
                }
              ]
            },
            "type": "EMR_HIVE"
          }
        ],
        "dependencies": [
          {
            "nodeId": "3326558787158921",
            "depends": [
              {
                "type": "Normal",
                "output": "2105761738722077",
                "refTableName": "run_first"
              }
            ]
          }
        ],
        "script": {
          "path": "Airflow匯入_V3/example_hive_operator",
          "runtime": {
            "command": "WORKFLOW"
          },
          "parameters": []
        },
        "name": "example_hive_operator",
        "trigger": {
          "type": "Scheduler",
          "cron": "0 0 0 * * ?",
          "timezone": "UTC",
          "delaySeconds": 0
        },
        "type": "CycleWorkflow",
        "strategy": {
          "timeout": 0,
          "instanceMode": "T+1",
          "rerunMode": "Allowed",
          "rerunTimes": 0,
          "rerunInterval": 0,
          "failureStrategy": "Continue"
        }
      }
    ],
    "flow": [
      {
        "nodeId": "3451387436863448",
        "depends": []
      }
    ]
  }
}

壓縮命令(參考)

zip -q -r -m -o <PackageName>.zip PackageName

3 配置項構建

請使用如下配置項,內容無需更改。調度資訊由自訂DataWorks Spec描述,因此此處無需配置過多配置項。

{
  "schedule_datasource": {
    "name": "MySpec",
    "type": "DataWorks",
    "operaterType": "MANUAL"
  },
  "conf": {}
}

4 運行調度解析工具

解析工具通過命令列調用,調用命令如下:

sh ./bin/run.sh read \
-c ./conf/<你的設定檔>.json \
-f ./data/0_OriginalPackage/<輸入包>.zip \
-o ./data/1_ReaderOutput/<遷移標準包>.zip \
-t dw-newidea-reader

其中-c為設定檔路徑,-f為輸入包路徑,-o為遷移標準包的產生路徑,-t為探查外掛程式名稱。

例如,當前需要解析專案A:

sh ./bin/run.sh read \
-c ./conf/projectA_read.json \
-f ./data/0_OriginalPackage/projectA_DataworksSpec.zip \
-o ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-t dw-newidea-reader

探查工具運行中將列印過程資訊,請關注運行過程中是否有報錯。

5 查看解析結果

開啟./data/1_ReaderOutput/下的產生包ReaderOutput.zip,可預覽解析結果。

其中,統計報表是對DataWorks Spec中任務流、節點、資源、函數、資料來源基本資料的匯總展示。

而data/project檔案夾下是對DataWorks Spec調度資訊資料結構標準化後的結果。

統計報表提供了兩項特殊能力:

1、報表中工作流程、節點的部分屬性被允許更改,允許更改的欄位以藍色字型標識。在下一階段匯入DataWorks時,工具將擷取表格中的屬性變更並使其生效。

2、報表允許通過刪除工作流程子表中的行,使得在匯入DataWorks時跳過這些工作流程(工作流程黑名單)。注意!若工作流程存在相互依賴關係,相關聯的工作流程需要同批次匯入,不可通過黑名單進行分割。分割會產生異常!

詳見:使用調度遷移中的概覽報表補充修改調度屬性

二、匯入DataWorks

匯入工具支援多輪刷寫,會自動選擇建立/更新任務流(OverWrite模式)。

1 前置條件

1.1 DataWorks Spec解析成功

DataWorks Spec解析完成,ReaderOutput.zip被成功產生。

(可選,推薦)開啟解析輸出包,查看統計報表,核對待遷移範圍是否被解析成功。

1.2 DataWorks側配置

DataWorks側需進行以下動作:

1、建立工作空間。

2、建立AK、SK且保證AK、SK對工作空間具有管理員權限。(強烈建議建立與帳號有綁定關係的AK、SK,以便在寫入遇到問題時進行排查)

3、在工作空間中建立資料來源、綁定計算資源、建立資源群組。

4、在工作空間中上傳檔案資源、建立UDF。

1.3 網路連通性檢查

驗證能否串連DataWorks Endpoint。

服務存取點列表:

服務存取點

ping dataworks.aliyuncs.com

2 匯入配置項

在工程目錄的conf檔案夾下建立匯出設定檔(JSON格式),如writer.json。

  • 使用前請刪除json中的注釋。

{
  "schedule_datasource": {
    "name": "YourDataWorks", //給你的DataWorks資料來源起個名字!
    "type": "DataWorks",
    "properties": {
      "endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服務存取點
      "project_id": "YourProjectId", // 工作空間ID
      "project_name": "YourProject", // 工作空間名稱
      "ak": "************", // AK
      "sk": "************", // SK
    },
    "operaterType": "MANUAL"
  },
  "conf": {
    "di.resource.group.identifier": "Serverless_res_group_***_***", // 調度資源群組
    "resource.group.identifier": "Serverless_res_group_***_***", // Data Integration資源群組
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks節點類型表的路徑
    "qps.limit": 5 // 向DataWorks發送API請求的QPS上限
  }
}

2.1 服務存取點

根據DataWorks所在Region選擇服務存取點,參考文檔:

服務存取點

2.2 工作空間ID與名稱

開啟DataWorks控制台,開啟工作空間詳情頁,從右側基本資料中擷取工作空間ID與名稱。

2.3 建立AK、SK並授權

在使用者頁建立AK、SK,要求對目標DataWorks工作空間擁有管理員讀寫權限。

許可權管理組件括兩處,如果帳號是RAM帳號,則需先對RAM帳號進行DataWorks操作授權。

權限原則頁面:https://ram.console.alibabacloud.com/policies

然後在DataWorks工作空間中,將工作空間許可權賦給帳號。

注意!AccessKey可設定網路訪問限制策略,請務必保證遷移工具所在機器的IP被允許訪問。

2.4 資源群組

由DataWorks工作空間詳情頁左側功能表列進入資源群組頁面,綁定資源群組,並擷取資源群組ID。

通用資源群組可用於節點調度,也可用於Data Integration。配置項中調度資源群組resource.group.identifier和Data Integration資源群組di.resource.group.identifier可以配置為同一通用資源群組。

2.5 QPS設定

工具通過調用DataWorks的API進行匯入操作。不同DataWorks版本中的讀、寫OpenAPI分別有相應的QPS限制和每日調用次數限制,詳見連結:使用限制

DataWorks基礎版、標準版、專業版建議填寫"qps.limit": 5,企業版建議填寫"qps.limit": 20。

注意,請儘可能避免多個匯入工具同時運行。

2.6 DataWorks節點類型ID設定

在DataWorks中,部分節點類型在不同Region中被分配了不同的TypeId。具體TypeID以DataWorks資料開發實際介面為準。存在此特性的節點類型以資料庫節點為主:資料庫節點

如:MySQL節點在杭州Region的NodeTypeId為1000039、在深圳Region的NodeTypeId為1000041。

為適應上述DataWorks不同Region的差異特性,工具提供了一種可配置的方式,允許使用者組態工具所使用的節點TypeId表。

表格通過匯入工具的配置項引入:

"conf": {
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks節點類型表的路徑
 }

從DataWorks資料開發介面上擷取節點類型Id的方法:在介面上建立一個工作流程,並在工作流程中建立一個節點,在點擊儲存後查看工作流程的Spec。

若節點類型配置錯誤,在任務流發布時將提示以下錯誤。

3 運行DataWorks匯入工具

轉換工具通過命令列調用,調用命令如下:

sh ./bin/run.sh write \
-c ./conf/<你的設定檔>.json \
-f ./data/1_ReaderOutput/<解析結果輸出包>.zip \
-o ./data/4_WriterOutput/<匯入結果儲存包>.zip \
-t dw-newide-writer

其中-c為設定檔路徑,-f為ReaderOutput包儲存路徑,-o為WriterOutput包儲存路徑,-t為提交外掛程式名稱。

例如,當前需要匯入DataWorks的專案A:

sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer

匯入工具運行中將列印過程資訊,請關注運行過程中是否有報錯。匯入完成後將在命令列中列印匯入成功與失敗的統計資訊。注意,部分節點的匯入失敗不會影響整體匯入流程,如遇少量節點匯入失敗,可在DataWorks中進行手動修改。

4 查看匯入結果

匯入完成後,可在DataWorks中查看匯入結果。匯入處理程序中亦可查看工作流程逐個匯入的過程,如發現問題需要終止匯入,可運行jps命令找到BwmClientApp,並使用kill -9終止匯入。

5 Q&A

5.1 源端持續在進行開發,這些增量與變更如何提交到DataWorks?

遷移工具為OverWrite模式,重新運行匯出、轉換、匯入可實現將源端增量提交到DataWorks的能力。請注意,工具將根據全路徑匹配任務流以選擇建立任務流/更新任務流。如需進行變更遷移,請勿移動任務流。

5.2 源端持續在進行開發,同時進行DataWorks上任務流改造與治理,增量遷移時是否會覆蓋DataWorks上的變更?

是的,遷移工具為OverWrite模式,建議您在完成遷移後再在DataWorks上進行後續改造。或者採用分批遷移的方式,已遷移等任務流再確認不再刷寫後開始DataWorks改造,不同批次之間互相不會影響。

5.3 整個包匯入耗時太長,能否只匯入一部分

可以,可手動裁剪待匯入包來實現部分匯入:將data/project/workflow檔案夾下需要匯入的任務流保留、其他任務流刪除,重新壓縮回壓縮包,再運行匯入工具。注意,存在相互依賴的任務流需要捆綁匯入,否則任務流間的節點血緣將會丟失。