DataWorks提供任务搬站功能,支持将Oozie、Azkaban、Airflow、DolphinScheduler等开源调度引擎的任务快速迁移至DataWorks。本文为您介绍导出任务的文件要求等相关信息。
背景信息
您需要先导出开源调度引擎的任务至本地或OSS,再导入至DataWorks。导入的详情请参见导入开源引擎任务。
导出Oozie任务
导出要求
导出的文件需包含XML和配置项等信息,导出后即为一个Zip格式文件。
导出结构
Oozie的任务描述在HDFS的某个Path下。以Oozie官方的Examples为例,Examples包中的apps目录下,每个子目录都是一个Oozie的Workflow Job。该子目录包含Workflow的定义XML和配置项等信息。导出文件的结构如下。
导出Azkaban任务
下载工作流
Azkaban拥有自己的Web控制台,支持在界面下载某个工作流(Flow)。
登录Azkaban控制台的Projects页面。
进入相应的Project页面,单击Flows,为您展示该Project下所有的工作流。
单击页面右上方的Download,下载Project的导出文件。
Azkaban导出包的格式无特别限制,是原生Azkaban即可。导出的Zip文件包含Azkaban的某个Project下所有任务(Job)及其关系的信息。Azkaban页面导出的Zip文件可直接在调度引擎作业导入页面上传导入。
转换逻辑
Azkaban与DataWorks转换项的对应关系及逻辑说明如下。
Azkaban转换项 | DataWorks转换项 | 说明 |
Flow | 数据开发(DataStudio)的业务流程 | Flow里的Job作为节点会放至Flow对应的业务流程目录下。 嵌套Flow的内部Flow也会单独转换为一个业务流程,通过Flow转换后的业务流程会自动建立节点间的依赖关系。 |
Command类型的Job | Shell节点 | 若使用DataWorks on EMR模式,则转换为EMR SHELL节点。可在导入任务的高级设置进行配置。 若Command命令行调用其他脚本,会自动分析具体是哪个脚本文件。分析到的脚本文件会注册为DataWorks的资源文件,转换后的Shell代码里会引用该资源文件。 |
Hive类型的Job | EMR_HIVE节点 | 若使用DataWorks on MaxCompute模式,则转换为ODPS SQL节点。可在导入任务的高级设置进行配置。 |
其他DataWorks不支持的节点 | 虚拟节点或Shell节点 | 可在导入任务的高级设置进行配置。 |

导出Airflow任务
使用限制
导出Airflow任务仅支持Airflow 1.10.x,且依赖Python 3.6及以上版本。
操作步骤
进入Airflow的执行环境。
使用Airflow的Python库,加载在Airflow上调度的Dag Folder。Dag Folder为您的Dag Python文件所在的目录。
使用导出工具,在内存中通过Airflow的Python库读取Dag Python文件的内部任务信息及其依赖关系,将生成的Dag信息写入JSON文件进行导出。
您可进入DataWorks的页面,下载导出工具。进入调度引擎作业导出的步骤请参考进入引擎作业导出。
工具操作说明
导出工具操作说明如下:
使用如下语句解压airflow-exporter.tgz。
tar zxvf airflow-exporter.tgz设置PYTHONPATH为Airflow的Python lib目录。示例语句如下。
export PYTHONPATH=/usr/local/lib/python3.6/site-packages导出Airflow任务。示例语句如下。
cd airflow-exporter python3.6 ./parser -d /path/to/airflow/dag/floder/ -o output.json使用如下语句,将导出的output.json文件生成Zip文件。
zip out.zip output.json
Zip文件生成后,您可进入DataWorks页面导入任务,详情请参见导入开源引擎任务。
导出DolphinScheduler任务
支持将任务导入到(旧版)数据开发(DataStudio)和(新版)数据开发(Data Studio)。
原理介绍
DataWorks导出工具通过调用DolphinScheduler的批量导出工作流定义API来获取工作流定义的JSON配置,并生成一个Zip文件。然后,使用dolphinscheduler_to_dataworks转换器将该Zip文件转换为DataWorks支持的文件/任务类型,并通过新建的DolphinScheduler导入任务解析和转换Zip文件中的代码和依赖关系,最终将其导入到DataWorks空间中。
使用限制
版本限制:只能将1.3.x、2.x、3.x版本的DolphinScheduler任务通过任务导出工具导入到DataWorks。
转换限制:
SQL任务:仅支持转换部分引擎的SQL节点,具体请以实际使用为准。并且转换过程中SQL代码不做语法转换、不进行修改。
Cron表达式:部分场景存在表达式剪裁或表达式功能不支持等情况,您需自行检查调度配置的定时时间是否满足要求。调度时间介绍,详情请参见时间属性配置说明。
Python节点:DataWorks没有单独的Python节点,Python节点目前是转换为Python文件资源和一个调用该Python资源的Shell节点,调度参数传递可能存在问题,您需自行调试检查。调度参数介绍,详情请参见调度参数配置。
Depend节点:暂不支持转换跨周期依赖。定义的依赖属性转换为DataWorks同周期调度依赖的本节点输入、本节点输出。同周期依赖配置,详情请参见配置同周期调度依赖。
配置任务类型映射
您可以通过以下步骤完成DataWorks导出工具的下载及任务类型映射的配置。
下载导出工具。
配置任务类型映射。
进入到导出工具目录下,可看到
lib、bin、conf目录。您需在conf目录的dataworks-transformer-config.json文件中修改映射关系。配置参数说明:
以下将以转换为
ODPS类型为例,为您说明dataworks-transformer-config.json文件中的参数信息。{ "format": "WORKFLOW", "locale": "zh_CN", "skipUnSupportType": true, "transformContinueWithError": true, "specContinueWithError": true, "processFilter": { "releaseState": "ONLINE", "includeSubProcess": true } "settings": { "workflow.converter.shellNodeType": "DIDE_SHELL", "workflow.converter.commandSqlAs": "ODPS_SQL", "workflow.converter.sparkSubmitAs": "ODPS_SPARK", "workflow.converter.target.unknownNodeTypeAs": "DIDE_SHELL", "workflow.converter.mrNodeType": "ODPS_MR", "workflow.converter.target.engine.type": "ODPS", "workflow.converter.dolphinscheduler.sqlNodeTypeMapping": { "POSTGRESQL": "POSTGRESQL", "MYSQL": "MYSQL" } }, "replaceMapping": [ { "taskType": "SHELL", "desc": "$[yyyyMMdd-1]", "pattern": "\$\[yyyyMMdd-1\]", "target": "\${dt}", "param": "dt=$[yyyyMMdd-1]" }, { "taskType": "PYTHON", "desc": "$[yyyyMMdd-1]", "pattern": "\$\[yyyyMMdd-1\]", "target": "\${dt}", "param": "dt=$[yyyyMMdd-1]" } ] }配置参数
配置说明
format
在进行数据迁移时,需要根据目标环境的版本设置相应的参数。具体规则如下:
如果目标环境是参加数据开发(Data Studio)(新版)公测的工作空间,需将该参数设置为
WORKFLOW。如果目标环境是未参加数据开发(Data Studio)(新版)公测的工作空间,需将该参数设置为
SPEC。
基本配置项,必填。
locale
指定语言环境,默认为
zh_CN。skipUnSupportType
任务类型转换遇到不支持的类型时是否跳过。
设置为
true,则跳过不支持的类型。设置为
false,则失败退出。
transformContinueWithError
在转换过程中遇到错误时是否继续执行。
设置为
true,则继续执行。设置为
false,则停止执行。
specContinueWithError
任务类型转换失败时是否继续转换。
设置为
true,则继续转换。设置为
false,则停止转换。
processFilter
releaseState
过滤条件,处理状态为在线的工作流。
支持转换过滤,若要对迁移任务进行过滤导入,请配置该参数。
includeSubProcess
过滤条件,是否包含状态为在线的子流程。
settings
workflow.converter.shellNodeType
源系统中的Shell节点映射到目标系统的类型(如DIDE_SHELL)。
配置映射信息,必填。
workflow.converter.commandSqlAs
SQL命令节点在目标系统中的执行引擎(如ODPS_SQL)。
workflow.converter.sparkSubmitAs
Spark提交任务的目标执行引擎(如ODPS_SPARK)。
workflow.converter.target.unknownNodeTypeAs
未知节点类型的默认映射(如DIDE_SHELL)。
workflow.converter.mrNodeType
MapReduce节点的目标执行引擎(如ODPS_MR)。
workflow.converter.target.engine.type
默认使用的计算引擎(如ODPS)。
workflow.converter.dolphinscheduler.sqlNodeTypeMapping
DolphinScheduler中SQL节点数据库类型到目标系统的映射。
"POSTGRESQL": "POSTGRESQL"。"MYSQL": "MYSQL"。
replaceMapping
taskType
规则适用的任务类型(如Shell或Python)。
支持正则替换匹配到的节点内容。
desc
描述(信息性字段,不参与实际处理,可为空)。
pattern
需要替换的正则表达式模式(如$[yyyyMMdd-1])。
target
替换后的目标字符串(如${dt})。
param
为替换后的目标字符串赋值。
例如:通过代码变量为节点变量赋值。(如 dt=$[yyyyMMdd-1])。
导出DolphinSchedule任务
您可以按照以下步骤,使用导出工具将DolphinScheduler作业导出为一个zip文件。示例命令如下,您需根据实际情况配置相应的参数信息。执行该命令后,DolphinScheduler的资源将会按照-f指定的文件名保存在当前目录下。
python3 bin/reader.py \
-a dolphinscheduler \
-e http://xxx.xxx.xxx.xxx:12345 \
-t {token} \
-v 1.3.9 \
-p 123456,456256 \
-f project_dp01.zip\
-sr false;配置参数 | 配置说明 |
-a | 待读取的系统类型,此处为固定值 |
-e | DolphinScheduler公网访问地址。 |
-t | DolphinScheduler应用 |
-v | 待导出的DolphinScheduler版本。 |
-p | 待导出的DolphinScheduler空间名称,支持配置多个,可用逗号隔开。 |
-f | 导出后的压缩文件名称,仅支持zip格式压缩包。 |
-sr | 是否skip资源下载,默认不下载,参数值为 说明
|
转换任务类型
您可以配置并执行以下脚本,通过dolphinscheduler_to_dataworks转换器,结合配置任务类型映射中定义的dataworks-transformer-config.json配置文件,将DolphinScheduler空间文件转换为对应的DataWorks文件或任务类型。
python3 bin/transformer.py \
-a dolphinscheduler_to_dataworks \
-c conf/dataworks-transformer-config.json \
-s project_dp01.zip \
-t project_dw01.zip;配置参数 | 配置说明 |
-a | 转换器名称,默认为 |
-c | 指定的转换配置文件。默认为配置任务类型映射时所配置的 |
-s | 待转换的DolphinScheduler文件名称。导出DolphinSchedule任务步骤中的导出结果文件。 |
-t | 转换为DataWorks文件格式后的结果文件名称。该文件将以 |
导入DolphinSchedule任务
您可通过配置执行以下脚本任务,将转换好的文件导入DataWorks空间。
python3 bin/writer.py \
-a dataworks \
-e dataworks.cn-shanghai.aliyuncs.com \
-i $ALIYUN_ACCESS_KEY_ID \
-k $ALIYUN_ACCESS_KEY_SECRET \
-p $ALIYUN_DATAWORKS_WORKSPACE_ID \
-r cn-shanghai \
-f project_dw01.zip \
-t SPEC;配置参数 | 配置说明 |
-a | 待写入的系统类型,此处默认为 |
-e | DataWorks OpenAPI的服务接入点Endpoint。您可在服务接入点Endpoint中根据您DataWorks工作空间所在地域获取该参数信息。 |
-i | 阿里云Access ID,需要有导入空间的权限。 |
-k | 阿里云Access Key,需要有导入空间的权限。 |
-p | DataWorks工作空间ID,即指定执行此py文件数据写入的工作空间。 |
-r | DataWorks工作空间所在地域ID,可通过服务接入点获取所在地域ID。 |
-f | 具体要导入DataWorks工作空间的本地文件。转换任务类型步骤中转换结果文件。 |
-t | 指定导入环境,导入目标为参加数据开发(Data Studio)(新版)公测的工作空间时,无需设置该参数。 |
完成上述操作后,您可前往目标DataWorks工作空间,查看任务的迁移情况。
导出其它开源引擎任务
DataWorks为您提供标准模板便于导出除Oozie、Azkaban、Airflow、DolphinScheduler外的开源引擎任务。导出任务前,您需要下载标准格式模板并参考模板的文件结构修改内容。下载模板及目录结构的介绍请进入开源引擎导出页面进行查询:
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,在下拉框中选择对应工作空间后单击进入数据开发。
单击左上方的
图标,选择。在左侧导航栏,单击,进入调度引擎导出方案选择页面。
单击标准模板。
在标准模板页签下,单击标准格式模板进行下载。
根据模板中的格式修改内容后,即可生成导出包。