本文介绍DataWorks的事件类型、消息格式、以及消息中各字段含义,本文档可帮助您快速获取和查询相应的事件列表以及消息格式的信息。
背景介绍
在DataWorks中,事件根据是否能在DataWorks内部形成卡点,以及是否能在被自建服务订阅后返回处理结果,被划分为普通事件和扩展点事件。
普通事件:用户可订阅此类事件消息,但不支持阻断DataWorks内部操作,您可以通过OpenEvent订阅此类事件消息,详情请参见开放事件(OpenEvent)。
扩展点事件:用户订阅这类事件消息后,您可以通过DataWorks开放平台的扩展程序功能自定义事件消息的响应。即当使用扩展程序(Extensions)对扩展点的操作进行管控时,DataWorks平台使用者在平台侧针对扩展点的操作将被中断,直至您自定义的扩展程序返回处理结果。
本列表按照模块将各模块支持的事件及该事件所属类型(普通事件或扩展点事件)进行划分,并列出EventBridge 事件类型(Type)以及扩展程序事件编码(eventCode)。
EventBridge 事件类型(Type):用于过滤事件消息的字段。具体详情请参见开启消息订阅。
扩展程序事件编码(eventCode):事件类型对应的事件编码,在本地开发时,该编码可从DataWorks发出的消息内容中获取,用来判断目标事件类型。
空间级事件
空间级模块生成的是事件为空间级事件,例如数据开发模块中对节点运行产生的事件消息、运维中心模块周期实例状态变更产生的事件消息等。您可通过下文事件列表了解各个模块支持的事件消息中,哪些为普通事件,哪些为扩展点事件,以及对应事件的发送的消息格式。
以下消息格式页签仅包含部分内容,发送给EventBridge或函数计算的完整消息,您需要结合附录:消息格式。
数据开发事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型(Type) | 扩展程序事件编码(eventCode) |
节点变更 | 新增节点
|
|
| ||
更新节点
|
|
| |||
文件变更
| 删除文件 开发环境删除节点 说明 删除的文件将进入回收站。 |
|
| ||
提交文件 |
|
| |||
发布文件 |
|
| |||
运行文件 |
|
| |||
表变更 | 表提交开发环境 |
|
| ||
表发布生产环境 |
|
|
消息格式
节点变更事件(新增、更新)
节点新增、修改、删除事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"nodeName": "****",
"programType": "ODPS_SQL",
"cronExpress": "00 04 00 * * ?",
"blockBusiness": false,
"schedulerType": "NORMAL",
"ownerId": "19****735",
"priority": 1,
"baselineId": 70***287,
"operator": "19***735", //本次操作人
"eventCode": "node-change-created",
"repeatability": true,
"modifyTime": 17***864,
"createTime": 17***864,
"tenantId": 28***656,
"nodeId": 70***003,
"projectId": 9***4
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
nodeName | String | 节点名称。 |
programType | String | 文件的代码类型。可以调用ListFileType接口来查询文件的代码类型,或查阅支持的节点类型查看文件代码类型。 |
cronExpress | String | 周期调度的cron表达式。 该参数与DataWorks控制台中,数据开发任务的调度配置 > 时间属性 > cron表达式对应。 配置完调度周期及定时调度时间后,DataWorks会自动生成相应cron表达式。示例如下:
说明 cron表达式的限制如下:
|
schedulerType | String | 任务实例的调度类型,取值如下:
|
ownerId | String | 节点责任人的阿里云用户ID。如果该参数为空,则默认使用调用者的阿里云用户ID。 |
priority | Integer | 任务优先级,取值为1、3、5、7、8。数值越大,优先级越高。 |
baselineId | Long | 基线ID。 |
repeatability | Boolean | 节点是否可以重复运行:
|
modifyTime | Long | 节点最近一次的修改时间。 |
createTime | Long | 节点的创建时间。 |
nodeId | Long | 节点ID。 |
projectId | Long | 节点所在的项目空间ID。 |
tenantId | Long | 节点所属的租户ID。 |
operator | String | 新增、修改或删除节点的用户UID。 |
eventCode | String | 扩展程序事件编码。 |
文件变更事件(提交、发布、运行、删除)
文件提交、发布事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "fileName": "******", "extensionBizId": "eb******9ce", "changeType": "0", "blockBusiness": false, "dataSourceName": "0_******engine", "operator": "19***735", "eventCode": "commit-file", "fileCreateTime": "2024-07-12 11:08:50", "tenantId": 28***656, "fileOwner": "19***735", "fileVersion": 1, "projectId": 9***4, "fileType": 10, "fileId": 50***830, "resourceType": 1 } }
字段说明如下:
字段名称
字段类型
说明
operator
String
提交或发布文件的用户UID。
projectId
Long
文件所属项目空间的ID。
tenantId
Long
租户ID。
nodeId
Long
调度节点ID。
fileType
Long
文件的代码类型。可以调用ListFileType接口来查询文件的代码类型,或查阅支持的节点类型查看文件代码类型。
fileName
String
文件名称。
fileOwner
String
文件所有者。
extensionBizId
String
扩展程序卡点流程ID。
changeType
String
文件的变更类型:
0:创建文件。
1:更新文件。
2:删除文件。
fileCreateTime
String
文件的创建时间,格式为
yyyy-MM-dd HH:mm:ss
。fileId
Long
文件ID。
fileVersion
Long
文件版本。
dataSourceName
String
数据源名称。
eventCode
String
扩展程序事件编码。
文件删除、运行事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "fileName": "***", "extensionBizId": "bf******6e3", "blockBusiness": false, "operator": "19***735", "eventCode": "delete-file", "fileCreateTime": "2024-07-12 11:08:50", //文件创建时间 "tenantId": 28***656, "fileOwner": "19***735", //文件所有人 "nodeId": 70***003, "projectId": 9***4, "fileType": 10, "fileId": 50***830, "resourceType": 1 } }
字段说明如下:
字段名称
字段类型
说明
operator
String
删除或运行文件的用户UID。
projectId
Long
文件所属项目空间的ID。
tenantId
Long
租户ID。
nodeId
Long
调度节点ID。
fileType
Long
文件的代码类型。可以调用ListFileType接口来查询文件的代码类型,或查阅支持的节点类型查看文件代码类型。
fileName
String
文件名称。
fileOwner
String
文件所有者。
extensionBizId
String
扩展程序卡点流程ID。
fileCreateTime
String
文件的创建时间,格式为
yyyy-MM-dd HH:mm:ss
。fileId
Long
文件ID。
eventCode
String
扩展程序事件编码。
表变更事件(提交表至开发环境、发布表至生产环境)
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"operator": "**************",
"projectId": 12*****56,
"tenantId": 12******56,
"extensionBizId": "12***56",
"tableName":"table1",
"tableType":"ODPS",
"maxComputeProject":"project1"
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
operator | String | 提交或发布表的用户UID。 |
projectId | Long | 项目空间ID。 |
tenantId | Long | 租户ID。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
tableName | String | 表名称。 |
tableType | String | 表类型,取值为ODPS。 |
maxComputeProject | String | 对应MaxCompute的项目名称。 |
数据集成事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型 (Type) | 扩展程序事件类型(eventCode) |
节点变更事件 | 开启任务 |
|
| ||
批量开启任务 |
|
|
消息格式
节点变更事件-开启任务
节点变更事件-开启任务事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
## message v1任务启动表没有超过500张
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "start-diJob",
"extensionBizId": "0a4***b8ae",
"extensionBizName": "sync_mysql_to_odps_20240726_192307",
"appId": 293624,
"showTableMapping": true,
"tenantId": 28***656,
"blockBusiness": true,
"id": 5280,
"projectId": 9***4,
"tableMapping": [
{
"srcTable": "xb_test_116",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_116"
},
{
"srcTable": "xb_test_117",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_117"
},
{
"srcTable": "xb_test_118",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_118"
},
{
"srcTable": "xb_test_135",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_135"
}
],
"operator": "1504650005316516"
}
}
## message v2任务启动表没有超过500张
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"extensionBizId": "59d***50fc",
"extensionBizName": "sync_mysql_to_holo_20240911_170801",
"blockBusiness": true,
"operator": "19***735",
"setting": {
"lastStartPosition": "2024-09-11 12:00:00"
},
"eventCode": "start-diJob",
"jobId": 5777,
"forceRun": false,
"appId": 293624,
"showTableMapping": true,
"tenantId": 28***656,
"startAsV2": false,
"tableMapping": [
{
"srcTable": "test_verify1",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "test_verify1"
},
{
"srcTable": "test_verify1_dst",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "test_verify1_dst"
},
{
"srcTable": "mysql_0_timetest2",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "mysql_0_timetest2"
}
]
}
}
##message v2任务启动表超过500张 ,"showTableMapping": false
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "start-diJob",
"jobId": 5502,
"forceRun": false,
"extensionBizId": "f4c***7cbc",
"extensionBizName": "sync_mysql_to_holo_20240412_213634",
"appId": 330914,
"showTableMapping": false,
"tenantId": 28***656,
"blockBusiness": true,
"startAsV2": false,
"operator": "19***735",
"setting": {
"lastStartPosition": "2024-04-12 22:07:02",
"startDateTime": "2024-09-10 17:00:00",
"timeZone": "Asia/Shanghai"
}
}
}
##正则
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "start-diJob",
"jobId": 5778,
"forceRun": false,
"extensionBizId": "a5d***75ba",
"extensionBizName": "sync_mysql_to_holo_20240912_170517",
"appId": 293624,
"showTableMapping": true,
"tenantId": 28***656,
"blockBusiness": true,
"startAsV2": false,
"tableMapping": [
{
"srcTable": "test.*",
"dstDatasourceName": "molin_db",
"srcDatabaseName": ".*",
"srcDatasourceName": "mysql_public",
"dstTable": "aaa"
},
{
"srcTable": "shard.*",
"dstDatasourceName": "molin_db",
"srcDatabaseName": ".*",
"srcDatasourceName": "mysql_public",
"dstTable": "vvv"
}
],
"operator": "19***735"
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
projectId | Long | DataWorks项目空间ID。 |
operator | String | 操作用户UID。 |
extensionBizName | String | 解决方案名称。 |
showTableMapping | Boolean | 是否返回表Mapping信息。
表超过500张,showTableMapping默认为 |
tableMapping | JSONArray | 表映射关系。 |
srcDatasourceName | String | 源数据源名称。 |
srcDatabaseName | String | 源数据库名称。 |
srcTable | String | 源表名称。 |
dstDatasourceName | String | 目标数据源名称。 |
dstTable | String | 目标表名称。 |
tenantId | Long | 租户ID。 |
eventCode | String | 扩展程序事件编码。 |
节点变更事件-批量开启任务
节点变更事件-批量开启任务事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"needErrorMessage": false,
"extensionBizId": "2de***c4c6",
"extensionBizName": "sync_mysql_to_holo_20240911_170801,sync_mysql_to_odps_20240726_192307",
"errorMessageOnlyFailedFileIds": false,
"blockBusiness": true,
"env": "prod",
"operator": "15***516",
"setting": {
"startDateTime": "2024-09-12 14:00:00",
"timeZone": "Asia/Shanghai"
},
"jobIds": [
5777,
5679
],
"eventCode": "batch-start-diJob",
"tableMappings": [
{
"extensionBizName": "sync_mysql_to_holo_20240911_170801",
"id": 5777,
"tableMapping": [
{
"srcTable": "test_verify1",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "test_verify1"
},
{
"srcTable": "test_verify1_dst",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "test_verify1_dst"
},
{
"srcTable": "mysql_0_timetest2",
"dstDatasourceName": "molin_db",
"srcDatabaseName": "di_test",
"srcDatasourceName": "mysql_public",
"dstTable": "mysql_0_timetest2"
}
]
},
{
"extensionBizName": "sync_mysql_to_odps_20240726_192307",
"id": 5679,
"tableMapping": [
{
"srcTable": "xb_test_116",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_116"
},
{
"srcTable": "xb_test_117",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_117"
},
{
"srcTable": "xb_test_118",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_118"
},
{
"srcTable": "xb_test_135",
"dstDatasourceName": "odps_first",
"srcDatabaseName": "xiaobo_sharding_79fz",
"srcDatasourceName": "mysql_3357_pub_ip_1",
"dstTable": "ods_xb_test_135"
}
]
}
],
"appId": 293624,
"showTableMapping": true,
"tenantId": 52***018,
"projectId": 9***4
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
projectId | Long | DataWorks项目空间ID。 |
operator | String | 操作用户UID。 |
extensionBizName | String | 解决方案名称。 |
setting | JSONObject | 启动配置项。 |
startDateTime | String | 启动时间。 |
timeZone | String | 启动时区。 |
tableMapping | JSONArray | 表映射关系。 |
srcDatasourceName | String | 源数据源名称。 |
srcDatabaseName | String | 源数据库名称。 |
srcTable | String | 源表名称。 |
dstDatasourceName | String | 目标数据源名称。 |
dstTable | String | 目标表名称。 |
showTableMapping | Boolean | 是否返回表Mapping信息。
表超过500张,showTableMapping默认为 |
tenantId | Long | 租户ID。 |
eventCode | String | 扩展程序事件编码。 |
运维中心事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型 (Type) | 扩展程序事件类型(eventCode) |
节点变更 | 删除节点
|
|
| ||
下线节点 生产环境周期任务直接下线节点 说明 该操作会同步删除数据开发中的节点,并放入回收站。 |
|
| |||
冻结节点 |
|
| |||
解冻节点 |
|
| |||
节点补数据 | 节点补数据 |
|
| ||
任务状态变更 | 调度任务状态变更 |
|
| ||
实例变更 | 冻结实例 |
|
| ||
解冻实例 |
|
| |||
终止实例 |
|
| |||
重跑实例
|
|
| |||
置成功实例 |
|
| |||
移除实例指定上游依赖关系 |
|
| |||
冻结实例前置事件 |
|
| |||
解冻实例前置事件 |
|
| |||
重跑实例前置事件 |
|
| |||
置成功实例前置事件 |
|
| |||
杀死实例前置事件 |
|
| |||
删除过期实例 |
|
| |||
工作流状态变更 | 工作流状态变更
|
|
| ||
监控告警 | 监控告警 |
|
|
消息格式
节点变更事件(删除、下线、冻结、解冻)
节点删除事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "nodeName": "", "programType": "ODPS_SQL", "cronExpress": "00 20 00 * * ?", "schedulerType": "NORMAL", "ownerId": "19****735", "priority": 1, "baselineId": 117801853, "repeatability": true, "modifyTime": 1646364549642, "createTime": 1646364549642, "datasource": "odps_source", "tenantId": 28378****10656, "nodeId": 100***150, "projectId": 30**95, "operator": "19***735" //本次操作人 } }
字段说明如下:
字段名称
字段类型
说明
nodeName
String
节点名称。
programType
String
文件的代码类型。可以调用ListFileType接口来查询文件的代码类型,或查阅支持的节点类型查看文件代码类型。
cronExpress
String
周期调度的cron表达式。
该参数与DataWorks控制台中,数据开发任务的调度配置 > 时间属性 > cron表达式对应。
配置完调度周期及定时调度时间后,DataWorks会自动生成相应cron表达式。示例如下:
每天凌晨5点30分定时调度:
00 30 05 * * ?
每个小时的第15分钟定时调度:
00 15 * * * ?
每隔十分钟调度一次:
00 00/10 * * * ?
每天8点到17点,每隔十分钟调度一次:
00 00-59/10 8-17 * * * ?
每月的1日0点20分自动调度:
00 20 00 1 * ?
从1月1日0点10分开始,每过3个月调度一次:
00 10 00 1 1-12/3 ?
每周二、周五的0点5分自动调度:
00 05 00 * * 2,5
说明cron表达式的限制如下:
最短调度间隔时间为5分钟。
每天最早调度时间为0点5分。
schedulerType
String
任务实例的调度类型,取值如下:
0:NORMAL,正常调度任务。该任务会被日常调度。
1:MANUAL,手动任务。该任务不会被日常调度。
2:PAUSE,冻结任务。该任务被日常调度,但启动调度时直接被置为失败状态。
3:SKIP,空跑任务。该任务被日常调度,但启动调度时直接被置为成功状态。
4:SKIP_UNCHOOSE,临时工作流中未选择的任务,仅存在于临时工作流中,启动调度时直接被置为成功状态。
5:SKIP_CYCLE,未到运行周期的周或月任务。该任务被日常调度,但启动调度时直接被置为成功状态。
6:CONDITION_UNCHOOSE,上游实例中有分支(IF)节点,但是该下游节点未被分支节点选中,直接置为空跑任务。
7:REALTIME_DEPRECATED,实时生成的已经过期的周期实例,该类型任务直接被置为成功状态。
ownerId
String
节点责任人的阿里云用户ID。如果该参数为空,则默认使用调用者的阿里云用户ID。
priority
Integer
任务优先级,取值为1、3、5、7、8。数值越大,优先级越高。
baselineId
Long
基线ID。
repeatability
Boolean
节点是否可以重复运行:
true:可以重复运行。
false:不可以重复运行。
modifyTime
Long
节点最近一次的修改时间。
createTime
Long
节点的创建时间。
nodeId
Long
节点ID。
projectId
Long
节点所在的项目空间ID。
tenantId
Long
节点所属的租户ID。
operator
String
新增、修改或删除节点的用户UID。
节点冻结、解冻、下线事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "operator": "19***735", "projectId": 12***56, "tenantId": 28***656, "nodeIds":[1,2,3], "extensionBizId": "12***56" } }
字段说明如下:
字段名称
字段类型
说明
operator
String
冻结、解冻、下线节点的用户UID。
projectId
Long
节点所在项目空间ID。
tenantId
Long
租户ID。
nodeIds
Array
被操作的节点ID列表。
extensionBizId
String
扩展程序卡点流程ID。
节点补数据
补数据操作事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"excludeNodeIds":[],
"rootNodeId": 1000****271,
"startFutureInstanceImmediately": false,
"useMultipleTimePeriods": true,
"operator": "19***735",
"eventCode": "backfill-data",
"multipleTimePeriods": "[{\"bizBeginTime\":\"2022-04-17\",\"bizEndTime\":\"2022-04-17\"}]",
"parallelGroup": 1,
"rootNodeProjectId": 12*****8,
"isParallel": false,
"name": "P_fff_20220418_215404",
"tenantId": 16935*****3377,
"includeNodeIds":
[
10***271
],
"projectId": 9***4,
"order": "asc",
"extensionBizId": "12***56"
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
name | String | 补数据工作流的名称。 |
rootNodeId | Long | 补数据工作流根节点ID。 |
rootNodeProjectId | Long | 补数据工作流根节点所在的项目空间ID。 |
includeNodeIds | Array | 补数据的节点ID列表。 |
excludeNodeIds | Array | 无需补数据的节点ID列表。该列表中的节点会生成空跑实例,空跑实例被调度后直接运行成功,不会执行脚本内容。 |
bizBeginTime | String | 任务的开始时间。仅小时调度任务需要设置该参数,格式为 |
bizEndTime | String | 任务的结束时间。仅小时调度任务需要设置该参数,格式为 |
isParallel | Boolean | 补数据操作是否可以并行运行:
|
parallelGroup | Integer | 并行运行的分组数,1表示不分组。 |
startFutureInstanceImmediately | Boolean | 是否跳过调度时间,立即调度运行未来时间的实例:
|
order | String | 根据业务日期制定补数据运行的顺序:
|
multipleTimePeriods | String | 分段选取业务日期。示例, |
tenantId | Long | 租户ID。 |
projectId | Long | 补数据操作所在的项目ID。 |
operator | String | 执行补数据操作的用户ID。 |
extensionBizId | String | 扩展程序卡点流程ID。 |
eventCode | String | 扩展程序事件编码。 |
任务状态变更事件
调度任务状态变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"beginWaitTimeTime": 1652700576000,
"dagId": 446***330,
"dagType": 0,
"eventCode": "instance-status-changes",
"taskType": 0,
"modifyTime": 1652700577000,
"createTime": 1652543233000,
"appId": 3*****2,
"tenantId": 235454***432001,
"opCode": 31,
"flowId": 1,
"nodeId": 100***219,
"beginWaitResTime": 1652700577000,
"taskId": 453***169,
"status": 3
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
finishTime | Long | 调度任务实例运行完成的具体时间。 |
beginWaitTimeTime | Long | 调度任务实例开始等待运行的具体时间。 |
beginRunningTime | Long | 调度任务实例开始运行的具体时间。 |
dagId | Long | 根据 |
dagType | Integer | Dag的类型,取值如下:
|
taskType | Integer | 任务实例的调度类型,取值如下:
|
modifyTime | Long | 任务实例最近一次的修改时间。 |
createTime | Long | 任务实例的创建时间。 |
appId | Long | 工作空间的ID。您可以调用ListProjects查看空间ID信息。 |
tenantId | Long | 调度任务实例所在工作空间的租户ID。 |
opCode | Integer | 调度任务实例的操作码。该字段可忽略。 |
flowId | Long | 业务流程的ID。
|
nodeId | Long | 调度任务实例对应的节点ID。 |
beginWaitResTime | Long | 调度任务实例开始等待资源的具体时间。 |
taskId | Long | 调度任务实例的ID。 |
status | Integer | 任务的状态,取值如下:
|
eventCode | String | 扩展程序事件编码。 |
实例变更事件(冻结、解冻、终止、重跑、置成功)
实例变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "freeze-instance",
"operator": "19***735",
"projectId": 12***8,
"projectType": "PROD",
"taskIds": [
523***9736
],
"tenantId": 28***656
}
}
字段说明如下
字段名称 | 字段类型 | 说明 |
operator | String | 操作实例(冻结、解冻、终止、重跑、置成功等操作)用户的UID。 |
projectType | String | 运行环境。
|
taskIds | List<Long> | 实例的ID集合。 |
projectId | Long | DataWorks项目空间ID。 |
tenantId | Long | 实例所在工作空间的租户ID。 |
eventCode | String | 事件编码。 |
实例变更前置事件(冻结、解冻、重跑、置成功、杀死实例)
实例变更前置事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "pre-freeze-instance",
"extensionBizId": "055***afaa",
"extensionBizName": "节点名",
"projectId": 9***4,
"taskIds": [
523536569736
],
"tenantId": 28***656,
"operator": "19***735"
}
}
字段说明如下
字段名称 | 字段类型 | 说明 |
extensionBizId | String | 扩展程序卡点流程ID。 |
extensionBizName | String | 操作对象名,多个对象会用”对象名...“进行描述。 |
projectId | Long | DataWorks项目空间ID。 |
taskIds | List<Long> | 实例的ID集合。 |
tenantId | String | 实例所在工作空间的租户ID。 |
operator | Long | 操作实例的用户UID。 |
eventCode | String | 扩展点事件编码。 |
实例变更事件-删除过期实例
删除过期实例事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventCode": "expired-task-instances-deleted",
"deletedTaskInstanceIds": [
524***035,
524***498,
524***637
],
"appId": 307303,
"tenantId": 28***656,
"blockBusiness": false,
"owner": "1107***538",
"operationTime": 1734505954897
}
}
字段说明如下
字段名称 | 字段类型 | 说明 |
deletedTaskInstanceIds | List | 被删除的实例ID列表。 |
owner | String | 实例owner用户ID。 |
operationTime | Long | 操作时间。 |
blockBusiness | Boolean | 存在扩展程序时,是否停止流程。
|
appId | Long | DataWorks项目空间ID。 |
tenantId | String | 实例所在工作空间的租户ID。 |
operator | Long | 操作实例的用户UID。 |
eventCode | String | 扩展点事件编码。 |
移除实例指定上游依赖关系事件
移除实例指定上游依赖关系的消息实体格式(即事件消息中data
字段的内容)示例如下:
{
"datacontenttype": "application/json;charset=utf-8",
"aliyunaccountid": "110******38",
"aliyunpublishtime": "2024-12-18T07:12:35.463Z",
"data": {
"eventCode": "delete-task-instance-dependencies",
"upstreamTaskInstanceIds": [
52******35,
52******98,
52******37
],
"appId": 3***03,
"tenantId": 52******36,
"blockBusiness": false,
"taskInstanceId": 52******49,
"operator": "19***735",
"operationTime": 1734505954897
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
taskInstanceId | Long | 下游实例ID。 |
upstreamTaskInstanceIds | List | 移除依赖关系的上游实例ID列表。 |
operator | String | 操作用户UID。 |
operationTime | Long | 操作时间。 |
工作流状态变更事件
工作流状态变更事件的消息实体格式(即事件消息中data
字段的内容)示例如下。
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"bizDate": "2022-11-07 00:00:00",
"createTime": "2022-11-08 10:56:32",
"dagId": 500358972116,
"dagName": "P_test_spark_true_copy_20221108_105631",
"eventCode": "dag-status-changes",
"dagType": 3,
"flowId": 1,
"flowName": "ATCLOUD_FLOW",
"operator": "11****538",
"projectEnv": "PROD",
"projectId": 25***63,
"status": 6,
"tenantId": 52***736
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
bizDate | String | 工作流的业务日期。格式为 |
createTime | String | 工作流的创建时间。格式为 |
dagId | Long | DagId,根据DagId可获取Dag详情。 |
dagName | String | 工作流的名称。 |
dagType | Integer | Dag的类型,取值如下:
|
flowId | Integer | 工作流对应的业务流程ID。 |
flowName | String | 工作流对应的业务流程名称。 |
operator | String | 创建工作流的用户UID。 |
projectEnv | String | 工作流所属环境,取值如下:
|
tenantId | Long | 调度任务实例所在工作空间的租户ID。 |
projectId | Long | 工作空间ID。 |
status | Integer | 工作流中任务的状态,取值如下:
|
eventCode | String | 扩展程序事件编码。 |
监控告警
基线告警的消息实体格式(即消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "workbench-monitor-alert", "alarmType": "SLA_ALERT", "baselineId": 137***723, "baselineName": "ods层check任务专用 -- 小时级", "baselineStatus": 3, "bizDate": 1654444800000, "inGroupId": 14, "nodeId": 1000***8734, "projectId": 76***34, "taskId": 307***3778, "tenantId": 28***656 } }
字段说明如下表。
字段名称
字段类型
说明
alarmType
String
告警类型。取值如下:
SLA_ALERT
REMIND_ALERT
TOPIC_ALERT
baselineId
Long
基线ID。
baselineName
String
基线名称。
baselineStatus
Integer
基线的状态。取值如下:
-1:异常。
1:安全。
2:预警。
3:破线。
bizDate
Long
业务日期时间戳。
inGroupId
Integer
基线实例的周期号。天基线为
1
,小时基线的取值范围为[1,24]
。nodeId
Long
导致基线异常的节点ID。
projectId
Long
基线所属项目空间ID。
taskId
Long
导致基线异常的实例ID。
tenantId
Long
租户ID。
eventCode
String
扩展程序事件编码。
事件告警的消息实体格式(即消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "alarmType": "TOPIC_ALERT", "nodeId": 1000***315, "projectId": 91***09, "taskId": 307***0357, "taskStatus": 5, "tenantId": 28***656, "topicId": 1084769 } }
字段说明如下表。
字段名称
字段类型
说明
alarmType
String
告警类型。取值如下:
SLA_ALERT
REMIND_ALERT
TOPIC_ALERT
topicId
Long
事件的ID。
taskStatus
String
触发事件的节点实例状态。
nodeId
Integer
触发事件的节点ID。
projectId
Long
触发事件的节点所属项目空间ID。
taskId
Long
触发事件的节点实例ID。
tenantId
Long
租户ID。
任务规则告警的消息实体格式(即消息中
data
字段的内容)示例如下。说明通常,规则对象为任务节点、基线、工作空间、业务流程等。
{ "datacontenttype": "application/json;charset=utf-8", "data": { "alarmType": "REMIND_ALERT", "nodeIds": "1000***5472,1000***5473,1000***5474", "projectId": 9***4, "remindId": 7605, "remindName": "出错报警", "remindType": "ERROR", "remindUnit": "NODE", "taskIds": "307***0896,307***0870,307***0855", "tenantId": 28***656 } }
字段说明如下表。
字段名称
字段类型
说明
alarmType
String
告警类型。取值如下:
SLA_ALERT
REMIND_ALERT
TOPIC_ALERT
nodeIds
String
触发规则告警的节点列表。
remindId
Long
规则ID。
remindType
Stirng
规则触发条件。取值如下:
FINISHED:完成。
UNFINISHED:未完成。
ERROR:运行出错。
CYCLE_UNFINISHED:周期未完成。
TIMEOUT:运行超时。
projectId
Long
触发规则的节点所属项目空间ID。
remindUnit
String
触发规则的对象类型。取值如下:
NODE:任务节点。
GATEWAY_RES:独享调度资源组。
DI_RES:数据集成资源组。
tenantId
Long
租户ID。
taskId
String
触发规则告警的实例列表。
remindName
String
规则名称。
资源组规则告警的消息实体格式(即消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "alarmType": "REMIND_ALERT", "projectId": 9***4, "remindId": 200***186, "remindName": "独享资源组告警", "remindType": "RES_GROUP_THRESHOLD", "remindUnit": "GATEWAY_RES", "resourceGroupIdentifier": "S_res_group_195820716552192_1650965857744", "resourceGroupName": "emr_exclusive_scheduld", "resourceGroupType": "GATEWAY", "tenantId": 28***656 } }
字段说明如下表。
字段名称
字段类型
说明
alarmType
String
告警类型。取值如下:
SLA_ALERT
REMIND_ALERT
TOPIC_ALERT
remindId
Long
规则ID。
remindType
Stirng
规则触发条件。取值如下:
FINISHED:完成。
UNFINISHED:未完成。
ERROR:运行出错。
CYCLE_UNFINISHED:周期未完成。
TIMEOUT:运行超时。
RES_GROUP_THRESHOL:资源组利用率。
RES_GROUP_WAIT_AMOUNT:资源组等待资源实例数。
projectId
Long
触发规则的节点所属项目空间ID。
remindUnit
String
触发规则的对象类型。取值如下:
NODE:任务节点。
GATEWAY_RES:独享调度资源组。
DI_RES:数据集成资源组。
tenantId
Long
租户ID。
remindName
String
规则名称。
resourceGroupIdentifier
String
资源组的唯一标识符。
resourceGroupName
String
资源组名称。
resourceGroupType
String
资源组类型。取值如下:
GATEWAY:调度资源组。
DI:数据集成资源组。
安全中心事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型 (Type) | 扩展程序消息类型(eventCode) |
审批中心 | 创建权限申请单前置事件 |
|
| ||
完成申请单 |
|
| |||
安全中心(表权限申请) | 表权限申请前置事件 |
|
|
消息格式
审批中心(创建、完成申请单)
创建权限申请单前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "appId":194209, "assignee":"286098539641742899", "assigneeName":"yupeng.sunyp", "createTime":1652094363000, "eventType":"approval", "process":{ "applicant":"286098539641742899", "applicantName":"yupeng.sunyp", "approvalContent":{ "applyPeriod":"2997964800000", "applyReason":"测试", "arrayData":[ { "ownerBaseId":"1822***45", "objectType":"TABLE", "odpsTable":"loghub_070103", "envType":1, "projectGuid":"odps.b_mc1", "objectGuid":"odps.b_mc1.loghub_070103", "tenantId":28***656, "objectName":"loghub_070103", "ownerAccountName":"ALIYUN$******(******)", "odpsProject":"B_MC1", "projectName":"B_MC1", "actions":[ "Select", "Describe" ], "projectId":9***4, "workspaceId":"9***4" } ], "contentType":"application/json", "granteeAccounts":[ { "granteeId":"2860985***99", "granteeTypeSub":103, "granteeType":1, "granteeName":"RAM$dataworks_3h1_1:yupeng.sunyp" }, { "granteeId":"237857631119109360", "granteeTypeSub":105, "granteeType":1, "granteeName":"RAM$dataworks_3h1_1:dev" } ], "odpsProjectName":"B_MC1", "projectEnv":"1", "resourceSummary":"loghub_070103", "tenantId":28***656, "workspaceId":194209 }, "assignmentCategory":"MaxCompute", "createTime":1652094363000, "processDefinitionId":"definition-3dcc9ce7-d29d-435d-a908-60d4355ff5e2", "processId":"528535869984706", "status":"Pending", "title":"MaxComputeTable", "updateTime":1652094363000 }, "processId":"528535869984706", "status":"Submit", "eventCode": "approval-change-created", "taskId":"528535870015424", "tenantId":28***656, "updateTime":1652094364000 } }{ "datacontenttype": "application/json;charset=utf-8", "data": { "appId":227859, "assignee":"286098539641742899", "eventCode": "approval-change-finished", "assigneeName":"******.******", "comments":"ces", "createTime":1652095981000, "eventType":"approval", "process":{ "applicant":"286098***2899", "applicantName":"yupeng.sunyp", "approvalContent":{ "applyPeriod":"2997964800000", "applyReason":"测试", "arrayData":[ { "ownerBaseId":"2382***884", "objectType":"TABLE", "odpsTable":"cdd", "objectNameCn":"******", "envType":1, "projectGuid":"odps.da_simple_202112", "objectGuid":"odps.da_simple_202112.cdd", "tenantId":0, "objectName":"cdd", "ownerAccountName":"RAM$******(******)", "odpsProject":"da_simple_202112", "projectName":"da_simple_202112", "actions":[ "Select", "Describe" ], "projectId":9***4, "workspaceId":"9***4" } ], "contentType":"application/json", "granteeAccounts":[ { "granteeId":"286***899", "granteeTypeSub":103, "granteeType":1, "granteeName":"RAM$dataworks_3h1_1:yupeng.sunyp" } ], "odpsProjectName":"da_simple_202112", "projectEnv":"1", "resourceSummary":"cdd", "tenantId":28***656, "workspaceId":227859 }, "assignmentCategory":"MaxCompute", "createTime":1652095981000, "processDefinitionId":"definition-6e6418e6-c65f-4f26-a673-88576b1c1e4a", "processId":"528***192", "status":"Pending", "title":"MaxComputeTable", "updateTime":1652095981000 }
字段说明如下:
字段名称
字段类型
说明
appId
Long
工作空间ID。
assignee
String
审批单处理人的BaseId。
assigneeName
String
审批单处理人的名称。
comments
String
备注信息。
createTime
Long
审批单的创建时间戳。
processId
String
审批单的ID。
status
String
审批单的状态。
taskId
String
审批任务的ID。
tenantId
String
租户ID。
updateTime
String
审批单更新的时间戳。
eventType
String
事件类型。
process
Object
审批任务对象。
applicant
String
申请BaseID。
applicantName
String
申请人名称。
assignmentCategory
String
申请内容类型。
createTime
String
审批单创建的时间戳。
processDefinitionId
String
工作流定义ID。
processId
String
工作流ID。
status
String
工作流状态。
title
String
工作流标题。
updateTime
Long
审批单更新的时间戳。
approvalContent
Object
审批内容对象。
applyPeriod
String
审批单的申请时长。
applyReason
String
审批单的申请原因。
contentType
String
审批内容类型。
odpsProjectName
String
审批项目名称。
resourceSummary
String
资源描述。
tenantId
Long
租户ID。
workspaceId
Long
工作空间ID。
projectEnv
String
审批项目所属环境。
granteeAccounts
Array
授权列表。
.granteeId
String
授权主体ID。
granteeType
String
授权类型。
granteeTypeSub
String
授权主体子类型。
granteeName
String
授权主体名称。
arrayData
Array
授权内容列表,详情请参见开发参考:事件列表与消息格式。
eventType
String
事件类型。
完成审批单事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "appId": 227859, "assignee": "286098539641742899", "eventCode": "approval-change-finished", "assigneeName": "******.******", "comments": "ces", "createTime": 1652095981000, "eventType": "approval", "process": { "applicant": "2860****899", "applicantName": "yupeng.sunyp", "approvalContent": { "applyPeriod": "2997***0000", "applyReason": "测试", "arrayData": [ { "ownerBaseId": "2382***8*884", "objectType": "TABLE", "odpsTable": "cdd", "objectNameCn": "******", "envType": 1, "projectGuid": "odps.da_simple_202112", "objectGuid": "odps.da_simple_202112.cdd", "tenantId": 0, "objectName": "cdd", "ownerAccountName": "RAM$******(******)", "odpsProject": "da_simple_202112", "projectName": "da_simple_202112", "actions": [ "Select", "Describe" ], "projectId": 9***4, "workspaceId": "9***4" } ], "contentType": "application/json", "granteeAccounts": [ { "granteeId": "286***899", "granteeTypeSub": 103, "granteeType": 1, "granteeName": "RAM$dataworks_3h1_1:yupeng.sunyp" } ], "odpsProjectName": "da_simple_202112", "projectEnv": "1", "resourceSummary": "cdd", "tenantId": 28***656, "workspaceId": 227859 }, "assignmentCategory": "MaxCompute", "createTime": 1652095981000, "processDefinitionId": "definition-6e6418e6-c65f-4f26-a673-88576b1c1e4a", "processId": "528***192", "status": "Pending", "title": "MaxComputeTable", "updateTime": 1652095981000 } } }
字段说明如下:
字段名称
字段类型
说明
appId
Long
工作空间ID。
assignee
String
审批单处理人BaseId。
assigneeName
String
审批单处理人名称。
comments
String
备注信息。
createTime
Long
审批单的创建时间戳。
processId
String
审批单ID。
status
String
审批单状态。
taskId。
String
审批任务ID。
tenantId
String
租户ID。
updateTime
String
审批单最近一次更新的时间戳。
eventType
String
事件类型。
process
Object
审批任务对象。
applicant
String
申请BaseID。
applicantName
String
申请人名称。
assignmentCategory
String
申请内容类型。
createTime
String
审批单创建的时间戳。
processDefinitionId
String
工作流定义ID。
processId
String
工作流ID。
status
String
工作流状态。
title
String
工作流标题。
updateTime
Long
审批单更新的时间戳。
approvalContent
Object
审批内容对象。
applyPeriod
String
申请时长。
applyReason
String
申请原因。
contentType
String
内容类型。
odpsProjectName
String
项目名称。
resourceSummary
String
资源描述。
tenantId
Long
租户ID。
workspaceId
Long
工作空间ID。
projectEnv
String
所属环境。
granteeAccounts
Array
授权列表。
granteeId
String
授权主体ID。
granteeType
String
授权类型。
granteeTypeSub
String
授权主体子类型。即授权账号类型,具体如下:
生产云账号(生产调度使用的账号):ACCOUNT_PRD(101)
应用云账号:ACCOUNT_APP(102)
个人云账号:ACCOUNT_USER(103)
部门云账号:ACCOUNT_DEPT(104)
MOCK账号:ACCOUNT_MOCK(106)
他人云账号:ACCOUNT_OTHER_USER(105)
granteeName
String
授权主体名称。
arrayData
Array
授权内容列表,详情请参见开发参考:事件列表与消息格式。
eventType
String
事件类型。
安全中心(表权限申请前置事件)
表权限申请前置事件的消息实体格式(即事件消息中data
字段的内容)示例如下
{
"datacontenttype": "application/json;charset=utf-8",
"data": {
"eventType": "approval-create-before",
"operator":"19***735",
"order":{
"applyReason":"测试",
"deadlineDate":"1",
"deadlineType":"month",
"granteeObjectList":[
{
"granteeId":"1239****8872"
}
],
"projectMeta":{
"envCode":1,
"labelSecurity":false,
"objectMetaList":[
{
"action":[
"Select",
"Describe"
],
"name":"tablei",
"projectGuid":"odps.d11aa"
}
],
"projectId":2****0,
"projectName":"d11aa"
}
},
"projectId":2****0,
"tenantId":5564****6465
}
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
operator | String | 执行表权限申请前置操作的用户UID。 |
projectId | Long | 执行表权限申请前置操作的工作空间ID。 |
tenantId | Long | 执行表权限申请前置操作的租户ID。 |
order | Object | 申请单信息。 |
applyReason | String | 申请原因。 |
deadlineDate | String | 申请单有效期。 |
deadlineType | String | 申请单有效期的时间单位。例如,Day、Month。 |
granteeObjectList | Object | 授权对象列表。 |
granteeId | String | 授权对象ID,即RAM用户ID。 |
projectMeta | Object | 工作空间信息。 |
envCode | Integer | 工作空间环境标识,0表示开发环境,1表示生产环境。 |
labelSecurity | Boolean | 是否开启Label级别管控:
|
objectMetaList | Array | 授权对象列表。 |
action | Array | 授权动作。例如,授权拥有表的Select或Describe权限。 |
name | String | 待申请权限的表名称。 |
projectGuid | String | 表所在工作空间Guid。 |
projectName | String | 表所在工作空间名称。 |
eventType | String | 事件类型。 |
附录:授权内容列表(MaxCompute)
DataWorks在安全中心事件中,触发审批和表权限申请时,触发的事件消息发送至事件总线(EventBridge)消息之中会增加arrayData
数据,当assignmentCategory
类型为MaxCompute
时,arrayData
的数据内容如下:
{
"ownerBaseId":"1822931104031845",
"objectType":"TABLE",
"odpsTable":"oracle_************",
"envType":1,
"projectGuid":"odps.***********",
"objectGuid":"odps.******.******",
"tenantId":0,
"objectName":"oracle_******",
"ownerAccountName":"ALIYUN***************",
"odpsProject":"dataworks******",
"projectName":"dataworks******",
"actions":[
"Select",
"Describe"
],
"projectId":9***4,
"workspaceId":"9***4"
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
ownerBaseId | String | 表所有者的BaseID。 |
ownerAccountName | String | 表所有者的名称。 |
objectType | String | 对象类型。 |
odpsTable | String | 表名称。 |
envType | String | 表所属环境。 |
projectGuid | String | 项目Guid。 |
objectGuid | String | 对象Guid。 |
objectName | String | 对象名称。 |
odpsProject | String | ODPS项目名称。 |
projectName | String | 项目代码。 |
projectId | Long | 项目ID。 |
workspaceId | String | 工作空间ID。 |
actions | Array | 操作类型列表。 |
附录:授权内容列表(DataService)
DataWorks在安全中心事件中,触发审批以及表权限申请时,发送至事件总线(EventBridge)消息中会增加arrayData
数据,当assignmentCategory
类型为DataService
时,arrayData
的数据内容如下
{
"resourceId":"DsApiDeploy/******/workspaceId/******/dsDeployId/******",
"ownerName":"******",
"resourceVersion":1,
"name":"api_api",
"dsDeployId":"******",
"workspaceName":"da_******",
"id":"******",
"type":1,
"ownerId":"19****735",
"url":"https://******.data.aliyun.com/?projectId=******&type=api&id=******&version=***&defaultProjectId=******",
"workspaceId":"******"
}
字段说明如下:
字段名称 | 字段类型 | 说明 |
resourceId | String | 资源ID。 |
ownerName | String | 资源所有者的名称。 |
resourceVersion | Long | 资源版本。 |
name | String | 资源名称。 |
dsDeployId | String | 数据服务发布ID。 |
workspaceName | String | 工作空间名称。 |
id | String | 资源唯一ID。 |
type | String | 资源类型,取值如下:
|
ownerId | String | 资源责任人BaseID。 |
url | String | 数据服务链接地址。 |
workspaceId | String | 工作空间ID。 |
数据质量事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型 (Type) | 扩展程序事件类型(eventCode) |
数据质量校验 | 校验结果反馈
|
| | ||
校验完成
|
|
| |||
数据质量规则前置 | 批量创建数据质量规则前置事件 |
|
| ||
批量更新数据质量规则前置事件 |
|
| |||
批量删除数据质量规则前置事件 |
|
| |||
更新数据质量规则前置事件 |
|
| |||
数据质量监控前置 | 创建数据质量监控前置事件 |
|
| ||
更新数据质量监控前置事件 |
|
| |||
克隆数据质量监控前置事件 |
|
| |||
批量删除数据质量监控前置事件 |
|
| |||
数据质量监控告警订阅前置 | 创建数据质量监控告警订阅前置事件 |
| create-data-quality-evaluation-task-notification | ||
更新数据质量监控告警订阅前置事件 |
|
| |||
删除数据质量监控告警订阅前置事件 |
|
|
消息格式
数据质量校验
数据质量校验结果反馈事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "ruleCheckId": 521771452, "feedbackContent": "跳过", "ruleId": 28610334, "createUser": "110755000425****", "taskId": "167644814****9a26ecf4063a88797", "beginTime": "1676448145000", "envType": "ODPS", "projectName": "test_mc_2303_kongjian", "projectId": 9***4, "tenantId": 28***656 } }
重要字段说明如下表。
字段名称
字段类型
说明
ruleId
Long
数据质量规则ID。
ruleCheckId
Long
校验结果自增ID。
feedbackContent
String
反馈内容。
createUser
String
反馈人用户ID。
taskid
String
数据质量任务的ID。
beginTime
String
反馈时间。
envType
String
规则关联的表所属数据源类型,包括:ODPS、EMR、HOLO。
projectName
String
规则关联的表所属的数据源唯一标识。
projectId
Long
DataWorks项目空间ID。
tenantId
Long
DataWorks租户ID。
数据质量校验完成事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "projectId": 9***4, "tenantId": 28***656, "id": 52177****, "taskId": "1671***7a6", "entityId": 1562***, "ruleId": 28610334, "property": "-", "bizdate": "2023-02-09 00:00:00", "dateType": "YMD", "actualExpression": "ds\u003d20230210", "matchExpression": "ds\u003d$[yyyymmdd]", "blockType": 1, "checkResult": 0, "eventCode": "dqc-check-finished-event", "checkResultStatus": 0, "methodName": "table_count", "beginTime": "2023-02-15 20:14:48", "endTime": "2023-02-15 20:14:55", "timeConsuming": "7s", "externalType": "CWF2", "externalId": "triggerByManual", "discrete": false, "fixedCheck": true, "referenceValue": [ { "bizDate": "3000-12-31 00:00:00", "discreteProperty": "表行数,1天差值", "value": 0.0, "singleCheckResult": 0 } ], "sampleValue": [ { "bizDate": "2023-02-09 00:00:00", "value": 3.0 } ], "trend": "\u003e\u003d", "expectValue": 0.0, "op": "\u003e\u003d", "projectName": "test_mc_2303_kongjian", "tableName": "sx_dim_1209_001", "templateId": 47, "checkerType": 0, "ruleName": "前一天差值", "isPrediction": false, "feedbackStatus": 0, "whetherToFilterDirtyData": false } }
字段说明如下:
字段名称
字段类型
说明
id
Long
本次校验流程的主键ID。每触发一次规则校验,都会新增一条主键ID记录。
projectId
Long
DataWorks项目空间ID
tenantId
Long
DataWorks租户ID
taskId
String
校验任务的ID。
entityId
Long
分区表达式ID。
ruleId
Long
规则ID。
property
String
规则属性的字段,即被校验数据源表的column名称。
bizdate
Long
业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。
dateType
String
调度周期的类型。通常为YMD,即年任务、月任务、天任务。
actualExpression
String
被校验的数据源表的实际分区。
matchExpression
String
分区表达式。
blockType
Integer
校验规则的强弱。强弱表示规则的重要程度。取值如下:
1,表示强规则。
0,表示弱规则。
您可以根据实际需求设置重要的规则为强规则。如果使用强规则并触发了红色告警,则会阻塞调度任务。
checkResult
Integer
校验结果状态。取值如下:
-1:校验异常
0:校验通过
1:触发橙色阈值
2:触发红色阈值
methodName
String
采集样本数据的方法包括:avg、count、sum、min、max、count_distinct、user_defined、table_count、table_size、table_dt_load_count、table_dt_refuseload_count、null_value、null_value/table_count、(table_count-count_distinct)/table_count、table_count-count_distinct等。
beginTime
Long
执行校验操作的开始时间。
endTime
Long
查询校验结果的截止时间。
timeConsuming
String
执行校验任务花费的时间。
externalType
String
调度系统的类型。目前仅支持CWF2。如果externalType为空则表示是手动试跑任务。
externalId
String
externalType为CWF2时,表示调度任务的节点ID。
externalType为空时,取值为triggerByManual,表示为手动触发任务。
discrete
Boolean
是否为离散校验。取值如下:
true:是离散校验。
false:非离散校验。
fixedCheck
Boolean
是否为固定值校验。取值如下:
true:是固定值校验。
false:非固定值校验。
referenceValue
历史样本值。
bizDate
Long
业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。
discreteProperty
String
通过group by分组后的样本字段取值。例如group by性别字段,则DiscreteProperty为男生、女生和null。
value
Decimal
样本值。
singleCheckResult
Integer
校验结果的字符串。
sampleValue
当前使用的样本。
bizDate
Long
业务日期。如果被校验的业务主体为离线数据,则业务日期通常为执行校验操作的前一天。
value
Decimal
样本值。
trend
String
校验结果的趋势。
expectValue
Double
期望值。
op
String
比较符。
projectName
String
需要进行数据质量校验的引擎或者数据源名称。
tableName
String
进行校验的表名称。
templateId
Integer
使用的校验模板的ID。
checkerType
Integer
校验器的类型。
ruleName
String
规则的名称。
isPrediction
Boolean
是否为预测的结果。取值如下:
true:是预测的结果。
false:不是预测的结果。
comment
String
校验规则的描述。
eventCode
String
扩展程序事件编码。
数据质量规则前置事件(批量创建、更新、删除)
批量创建数据质量规则前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "batch-create-data-quality-rules", "projectId": 30***03, "tenantId": 28***656, "operator": "19***735", "operationTime": 1734505954897, "dataQualityTaskId": 1001, "target": { "databaseType": "maxcompute", "tableGuid": "odps.project_demo.tb_table_demo" }, "rules": [ { "name": "表行数大于0", "enabled": true, "severity": "High", "description": "数据质量规则创建操作检查测试", "templateCode": "system:table:table_count:fixed:0", "samplingConfig": { "metric": "count" }, "checkingConfig": { "type": "fixed", "thresholds": { "expected": { "expression": "$checkValue > 0" }, "critical": { "expression": "$checkValue <= 0" } } } } ] } }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
List
DataWorks项目空间ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
dataQualityTaskId
Long
规则关联的数据质量监控ID,可为空。
target
DataQualityTarget
数据质量规则的监控对象。
databaseType
String
表类型的数据集,表所属的数据库类型。
MaxCompute
EMR
CDH
Hologres
AnalyticDB for PostgreSQL
tableGuid
String
表在数据地图中的唯一ID。
rules
List<DataQualityRule>
数据质量规则列表。
name
String
规则名称。
enabled
Boolean
规则是否启用。
severity
String
规则对于业务的等级(对应页面上的强弱规则)。
High
Normal
description
String
规则描述信息。
templateCode
String
创建规则时所引用的规则模板
samplingConfig
SamplingConfig
样本采集所需的设置。
metric
String
采样的指标名称。
Count:表行数
Min:字段最小值
Max:字段最大值
Avg:字段均值
DistinctCount:字段唯一值个数
DistinctPercent:字段唯一值个数与数据行数占比
DuplicatedCount:字段重复值个数
DuplicatedPercent:字段重复值个数与数据行数占比
TableSize:表大小
NullValueCount:字段为空的行数
NullValuePercent:字段为空的比例
GroupCount:按字段值聚合后每个值与对应的数据行数
CountNotIn:枚举值不匹配行数
CountDistinctNotIn:枚举值不匹配唯一值个数
UserDefinedSql:通过自定义SQL做样本采集
checkingConfig
CheckingConfig
样本校验设置。
type
String
阈值计算方式
Fixed
Fluctation
FluctationDiscreate
Auto
Average
Variance
thresholds
Thresholds
阈值设置。
expected
Threshold
期望的阈值设置。
expression
String
阈值表达式。
critical
Threshold
严重警告的阈值设置。
批量更新数据质量规则前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "update-data-quality-rule", "projectId": 30***03, "tenantId": 28***656, "operator": "19***735", "operationTime": 1734505954897, "id": 100001, "name": "表行数大于0", "enabled": true, "severity": "High", "description": "数据质量规则创建操作检查测试", "templateCode": "system:table:table_count:fixed:0", "samplingConfig": { "metric": "count" }, "checkingConfig": { "type": "fixed", "thresholds": { "expected": { "expression": "$checkValue > 0" }, "critical": { "expression": "$checkValue <= 0" } } } } }
重要字段说明如下表。
字段名称
字段类型
说明
eventCode
String
事件编码。
projectId
Long
项目空间ID。
tenantId
Long
租户ID。
operator
String
用户UID。
operationTime
Long
操作时间。
id
Long
规则ID。
name
String
规则名称。
enabled
Boolean
规则是否启用。
true:启用。
false:不启用。
severity
String
规则对于业务的等级(对应页面上的强弱规则)。
High
Normal
description
String
规则描述信息。
templateCode
String
分区表的分区设置。
samplingConfig
SamplingConfig
样本采集所需的设置。
metric
String
采样的指标名称。
Count:表行数
Min:字段最小值
Max:字段最大值
Avg:字段均值
DistinctCount:字段唯一值个数
DistinctPercent:字段唯一值个数与数据行数占比
DuplicatedCount:字段重复值个数
DuplicatedPercent:字段重复值个数与数据行数占比
TableSize:表大小
NullValueCount:字段为空的行数
NullValuePercent:字段为空的比例
GroupCount:按字段值聚合后每个值与对应的数据行数
CountNotIn:枚举值不匹配行数
CountDistinctNotIn:枚举值不匹配唯一值个数
UserDefinedSql:通过自定义SQL做样本采集
checkingConfig
CheckingConfig
样本校验设置。
type
String
阈值计算方式
Fixed
Fluctation
FluctationDiscreate
Auto
Average
Variance
thresholds
Thresholds
阈值设置。
expected
Threshold
期望的阈值设置。
expression
String
阈值表达式。
critical
Threshold
严重警告的阈值设置。
批量删除数据质量规则前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "batch-delete-data-quality-rules", "ids": [ 10***01, 10***02, 10***03 ], "projectId": 30***03, "tenantId": 524***4736, "operator": "19***735", "operationTime": 1734505954897 } }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
List
DataWorks项目空间ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
ids
List<Long>
被删除的数据质量规则ID列表。
tenantId
String
租户ID。
eventCode
String
事件编码。
更新数据质量规则前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "update-data-quality-rule", "projectId": 30***03, "tenantId": 524***4736, "operator": "110***3538", "operationTime": 1734505954897, "id": 100001, "name": "表行数大于0", "enabled": true, "severity": "High", "description": "数据质量规则创建操作检查测试", "templateCode": "system:table:table_count:fixed:0", "samplingConfig": { "metric": "count" }, "checkingConfig": { "type": "fixed", "thresholds": { "expected": { "expression": "$checkValue > 0" }, "critical": { "expression": "$checkValue <= 0" } } } } }
重要字段说明如下表。
字段名称
字段类型
说明
eventCode
String
事件编码。
projectId
Long
项目空间ID。
tenantId
Long
租户ID。
operator
String
用户UID。
operationTime
Long
操作时间。
id
Long
规则ID。
name
String
规则名称。
enabled
Boolean
规则是否启用。
true:启用。
false:不启用。
severity
String
规则对于业务的等级(对应页面上的强弱规则)。
High
Normal
description
String
规则描述信息。
templateCode
String
分区表的分区设置。
samplingConfig
SamplingConfig
样本采集所需的设置。
metric
String
采样的指标名称。
Count:表行数
Min:字段最小值
Max:字段最大值
Avg:字段均值
DistinctCount:字段唯一值个数
DistinctPercent:字段唯一值个数与数据行数占比
DuplicatedCount:字段重复值个数
DuplicatedPercent:字段重复值个数与数据行数占比
TableSize:表大小
NullValueCount:字段为空的行数
NullValuePercent:字段为空的比例
GroupCount:按字段值聚合后每个值与对应的数据行数
CountNotIn:枚举值不匹配行数
CountDistinctNotIn:枚举值不匹配唯一值个数
UserDefinedSql:通过自定义SQL做样本采集
checkingConfig
CheckingConfig
样本校验设置。
type
String
阈值计算方式
Fixed
Fluctation
FluctationDiscreate
Auto
Average
Variance
thresholds
Thresholds
阈值设置。
expected
Threshold
期望的阈值设置。
expression
String
阈值表达式。
critical
Threshold
严重警告的阈值设置。
数据质量监控前置事件(创建、更新、克隆、批量删除)
创建、更新数据质量监控前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "create-data-quality-evaluation-task", "name": "表数据准确性监控", "description": "向表中写入数据的调度实例成功执行后,触发监控,检查产出数据是否符合预期", "target": { "databaseType": "emr", "tableGuid": "an-emr-table-guid" }, "trigger": { "type": "ByScheduledTaskInstance", "taskIds": [ 1001, 1002 ] }, "dataSourceId": 201, "runtimeConf": "{ \"queue\": \"default\", \"sqlEngine\": \"HIVE_SQL\" }", "rules": [ { "name": "表行数大于0", "enabled": true, "severity": "High", "description": "数据质量规则创建操作检查测试", "templateCode": "system:table:table_count:fixed:0", "samplingConfig": { "metric": "count" }, "checkingConfig": { "type": "fixed", "thresholds": { "expected": { "expression": "$checkValue > 0" }, "critical": { "expression": "$checkValue <= 0" } } } }, { "id": 100002 "name": "表行数大于100", "checkingConfig": { "type": "fixed", "thresholds": { "expected": { "expression": "$checkValue > 100" }, "critical": { "expression": "$checkValue <= 100" } } } } ], "hooks": [ { "type": "BlockTaskInstance", "condition": "(${severity} == \"High\" AND ${status} == \"Critical\") OR (${severity} == \"High\" AND ${status} == \"Error\")" } ], "notifications": { "condition": "(${severity} == \"High\" AND ${status} == \"Warned\") OR (${severity} == \"Normal\" AND ${status} == \"Critical\") OR (${severity} == \"Normal\" AND ${status} == \"Warned\") OR (${severity} == \"Normal\" AND ${status} == \"Error\")", "notifications": [ { "channels": [ "Mail", "Sms" ], "notificaionReceivers": [ { "receiverType": "AliUid", "receiverValues": [ "1107550004253538", "51107550004253538" ] } ] }, { "channels": [ "Dingding" ], "notificaionReceivers": [ { "receiverType": "DingdingUrl", "receiverValues": [ "https://api.dingding.com/message-boxes/b1/messages", "https://api.dingding.com/message-boxes/b2/messages" ], "extension": "{ \"atAll\": true }" } ] } ] }, "projectId": 30***03, "tenantId": 524***4736, "operator": "110***3538", "operationTime": 1734505954897 } }
重要字段说明如下表。
字段名称
字段类型
说明
eventCode
String
事件编码。
name
String
质量监控任务名称。
description
String
数据质量校验任务描述。
target
DataQualityTarget
数据质量校验任务的监控对象。
databaseType
String
表类型的数据集,表所属的数据库类型。
MaxCompute
EMR
CDH
Hologres
AnalyticDB for PostgreSQL
tableGuid
String
表在数据地图中的唯一ID。
trigger
DataQualityEvaluationTaskTrigger
数据质量校验任务的触发配置。
type
String
何种事件可以触发质量校验任务执行。
ByScheduledTaskInstance:调度实例运行成功,只支持公共云场景。
ByManual:手动触发。
taskIds
Array<Long>
type=ByScheduledTaskInstance时生效,具体指明哪些调度节点的实例执行成功后可以触发。
dataSourceId
Long
数据质量校验任务执行时应使用的数据源。
runtimeConf
String
使用数据源时的设置,目前只支持指定EMR的yarn队列、采集EMR表时把SQL引擎指定为SPARK-SQL。
rules
List<DataQualityRule>
数据质量规则列表。更多参数详情请参见数据质量规则前置事件参数。
hooks
Array<DataQualityEvaluationTaskHook>
数据质量校验任务实例生命周期中的回调设置,目前只支持一个阻塞调度任务的Hook。
type
String
后续处理动作类型。
BlockTaskInstance:阻塞DataWorks任务实例执行。
condition
String
Hook触发条件。
notifications
Array<Notification>
具体的消息通知设置。
projectId
Long
项目空间ID。
tenantId
Long
租户ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
克隆数据质量监控前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "clone-data-quality-evaluation-task", "id": 10001, "targets": [ { "databaseType": "emr", "tableGuid": "an-emr-table-guid" }, { "databaseType": "emr", "tableGuid": "another-emr-table-guid" } ], "projectId": 9***4, "tenantId": 28***656, "operator": "19***735", "operationTime": 1734505954897 } }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
Long
DataWorks项目空间ID。
tenantId
String
租户ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
targets
List<DataQualityTarget>
要克隆到的数据质量监控目标。
databaseType
String
表类型的数据集,表所属的数据库类型。
MaxCompute
EMR
CDH
Hologres
AnalyticDB for PostgreSQL
tableGuid
String
表在数据地图中的唯一ID。
id
Long
用来克隆的源数据质量监控ID。
eventCode
String
事件编码。
批量删除数据质量监控前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "batch-delete-data-quality-evaluation-tasks", "ids": [ 10001, 10002 ], "projectId": 9***4, "tenantId": 28***656, "operator": "19***735", "operationTime": 1734505954897 } }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
Long
DataWorks项目空间ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
ids
List<Long>
被删除的数据质量监控ID列表。
tenantId
String
租户ID。
eventCode
String
事件编码。
数据质量监控告警订阅前置事件(创建、更新、删除)
创建、删除数据质量监控告警订阅前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "create-data-quality-evaluation-task-notification", "dataQualityEvaluationTaskId": 10001, "channel": "sms", "receiverValue": "1107***38", "projectId": 30***03, "operator": "110***3538", "operationTime": 1734505954897 } }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
Long
DataWorks项目空间ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
dataQualityEvaluationTaskId
Long
数据质量监控ID。
channel
String
订阅消息发送目标渠道类型。
Mail - 邮件
Sms - 短信
Phone - 电话
Feishu - 飞书
Weixin - 微信
Dingding - 钉钉
Webhook - 自定义Webhook
receiverValue
String
订阅消息发送的具体目标。
eventCode
String
事件编码。
创建数据质量监控前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "data": { "eventCode": "update-data-quality-evaluation-task-notification", "dataQualityEvaluationTaskId": 10001, "currentChannel": "sms", "currentReceiverValue": "1107***538", "updatedChannel": "sms", "updatedReceiverValue": "1107***538", "projectId": 30***03, "operator": "110***3538", "operationTime": 1734505954897 }
重要字段说明如下表。
字段名称
字段类型
说明
projectId
Long
DataWorks项目空间ID。
operator
String
操作用户UID。
operationTime
Long
操作时间。
dataQualityEvaluationTaskId
Long
数据质量监控ID。
currentChannel
String
被更新的订阅消息发送目标渠道类型。
Mail - 邮件
Sms - 短信
Phone - 电话
Feishu - 飞书
Weixin - 微信
Dingding - 钉钉
Webhook - 自定义Webhook
currentReceiverValue
String
被更新的订阅消息发送的具体目标。
updatedChannel
String
更新后的订阅消息发送目标渠道类型。
Mail - 邮件
Sms - 短信
Phone - 电话
Feishu - 飞书
Weixin - 微信
Dingding - 钉钉
Webhook - 自定义Webhook
updatedReceiverValue
String
更新后的订阅消息发送的具体目标。
eventCode
String
事件编码。
租户级事件
租户级模块生成的是租户级事件,例如管控台删除空间所产生的事件消息。您可通过下文事件列表了解各个模块支持的事件消息中,哪些为普通事件,哪些为扩展点事件,以及对应事件的发送的消息格式。
以下消息格式页签仅包含部分内容,发送给EventBridge或函数计算的完整消息,您需要结合附录:消息格式。
管控台事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型(Type) | 扩展程序事件类型(eventCode) |
删除项目空间 | 删除空间前置 |
|
| ||
删除空间后置 |
|
|
消息格式
消息实体格式:删除项目空间事件
租户删除项目空间前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "data": { "eventCode": "delete-project", "projectId": 7***7, // 工作空间ID "tenantId": 2807****0784, // 租户ID "operator": "19***735 }
字段说明如下:
字段名称
字段类型
说明
operator
String
删除DataWorks工作空间的用户UID。
projectId
Long
待删除的DataWorks工作空间ID。
tenantId
Long
待删除工作空间所属的租户ID。
eventCode
String
扩展程序事件编码。
租户删除项目空间后置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "data": { "eventCode": "project-deleted", "tenantId": 28***656, "blockBusiness": false, "projectName": "test2", "projectId": 9***4, "operator": "19***735", "timestamp": 1702260556896 } }
字段说明如下:
字段名称
字段类型
说明
operator
String
删除DataWorks工作空间的用户UID。
projectId
Long
待删除的DataWorks工作空间ID。
projectName
String
待删除的DataWorks工作空间名称。
tenantId
Long
待删除工作空间所属的租户ID。
timestamp
Long
发送消息的时间戳。
eventCode
String
扩展程序事件编码。
上传与下载事件列表
事件列表
事件类型 | 事件(及产生事件的操作) | 普通事件 | 扩展点事件 | EventBridge事件类型(Type) | 扩展程序事件类型(eventCode) |
数据下载与上传 | 数据下载前置-文件生成 | dataworks:ResourcesDownload:DownloadResources | download-resources | ||
数据下载前置-文件下载 | dataworks:ResourcesDownload:DownloadResourcesExecute | download-resources-execute | |||
数据上传前置 | dataworks:ResourcesUpload:UploadDataToTable | upload-data-to-table |
消息格式
数据下载与上传
数据下载前置事件-文件生成的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "data": { "eventCode": "download-resources", "moduleType": "sqlx_query", "operatorBaseId": "123936573******", "operatorUid": "14931896037*******", "fileName": "文件名称.csv", "fileSize": 10241024, "datasourceId": "1111", "datasourceName": "odps_first", "queryDwProjectId": "9***4", "queryDwProjectName": "test_project", "dataRowSize": "123456", "sqlText": "select sku_code, sku_name from dim_sku", }
字段说明如下:
字段名称
字段类型
说明
moduleType
String
下载数据的来源:
entity_transfer:在安全中心 > 安全策略 > 实体转交 > 转交日志下载的数据。
develop_query:在数据开发(DataStudio)使用SQL语句查询并下载的数据。
sqlx_query:在数据分析 > SQL查询使用SQL语句查询并下载的数据。
dw_excel:在数据分析 > 电子表格下载的数据。
operatorBaseId
String
执行下载操作的用户BaseID。
operatorUid
String
执行下载操作的用户UID。
fileName
String
下载文件的名称。
fileSize
Long
下载文件的大小。
datasourceId
String
下载的数据所属的数据源ID。
datasourceName
String
下载的数据所属的数据源名称。
queryDwProjectId
String
下载的数据所属的DataWorks工作空间ID。
queryDwProjectName
String
下载的数据所属的DataWorks工作空间标识。
dataRowSize
Long
下载的数据条数。
sqlText
String
通过SQL语句查询并下载数据,此处为使用的SQL代码。
eventCode
String
扩展程序事件类型。
数据下载前置事件-文件下载的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "1493189603770213", "aliyunpublishtime": "2023-12-11T02:10:00.194Z", "data": { "eventCode": "download-resources-execute", "moduleType": "sqlx_query", "operatorBaseId": "123936573******", "operatorUid": "14931896037*******", "fileName": "文件名称.csv", "fileSize": 10241024, "datasourceId": "1111", "datasourceName": "odps_first", "queryDwProjectId": "9***4", "queryDwProjectName": "test_project", "dataRowSize": "123456", "sqlText": "select sku_code, sku_name from dim_sku", "ip": "198.10.X.X" }, "aliyunoriginalaccountid": "149318960******", "specversion": "1.0", "aliyuneventbusname": "default", "id": "2c3e41e5-3486-40ce-87d4-910f989cf2a7", "source": "acs.dataworks", "time": "2023-12-11T10:10:00.117Z", "aliyunregionid": "cn-shanghai", "type": "dataworks:ResourcesDownload:DownloadResourcesExecute" }
重要字段说明如下表。
字段名称
字段类型
说明
moduleType
String
下载数据的来源:
entity_transfer:在安全中心 > 安全策略 > 实体转交 > 转交日志下载的数据。
develop_query:在数据开发(DataStudio)使用SQL语句查询并下载的数据。
sqlx_query:在数据分析 > SQL查询使用SQL语句查询并下载的数据。
dw_excel:在数据分析 > 电子表格下载的数据。
operatorBaseId
String
执行下载操作的用户BaseID。
operatorUid
String
执行下载操作的用户UID。
fileName
String
下载文件的名称。
fileSize
Long
下载文件的大小。
datasourceId
String
下载的数据所属的数据源ID。
datasourceName
String
下载的数据所属的数据源名称。
queryDwProjectId
String
下载的数据所属的DataWorks工作空间ID。
queryDwProjectName
String
下载的数据所属的DataWorks工作空间标识。
dataRowSize
Long
下载的数据条数。
sqlText
String
通过SQL语句查询并下载数据,此处为使用的SQL代码。
ip
String
执行下载操作的用户的IP地址。
eventCode
String
扩展程序事件类型。
数据上传前置事件的消息实体格式(即事件消息中
data
字段的内容)示例如下。{ "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "1493189603770213", "aliyunpublishtime": "2023-12-11T02:10:00.194Z", "data": { "eventCode": "upload-data-to-table", "uploadSourceType": "LOCAL", "optTableType": "CREATE", "targetEngineType": "MAXCOMPUTE", "writeType": "OVERWRITE", "conflictMode": "IGNORE", "operatorBaseId": "12312*****", "operatorUid": "1222222*****", "datasourceId": "1111", "datasourceName": "odps_first", "tableGuid": "odps.mc_project.test_table", "queryDwProjectId": "9***4", "queryDwProjectName": "test_project", "fileSize": 123456 }, "aliyunoriginalaccountid": "149318960******", "specversion": "1.0", "aliyuneventbusname": "default", "id": "2c3e41e5-3486-40ce-87d4-910f989cf2a7", "source": "acs.dataworks", "time": "2023-12-11T10:10:00.117Z", "aliyunregionid": "cn-shanghai", "type": "dataworks:ResourcesUpload:UploadDataToTable" }
字段说明如下:
字段名称
字段类型
说明
uploadSourceType
String
上传数据的来源:
LOCAL:上传本地文件数据。
OSS:上传OSS文件数据。
DW_EXCEL:上传DataWorks的数据分析 > 电子表格数据。
HTTP:上传HTTP文件数据。
optTableType
String
选择将数据上传至指定引擎数据源的已有表或新建表。取值如下:
CREATE:上传数据至新建表。
IMPORT:上传数据至已有表。
targetEngineType
String
选择将数据上传至哪类引擎。取值如下:
MaxCompute
EMR Hive
Hologres
writeType
String
数据写入至目标表的方式。取值如下:
OVERWRITE:覆盖原有表数据。
APPEND:将上传的数据追加至目标表中。
conflictMode
String
上传数据时,目标表的主键冲突处理策略:
IGNORE:存在主键冲突时,忽略上传的数据。
REPLACE:存在主键冲突时,先删除冲突数据所在的行,再将上传的数据插入该行。上传数据未指定的字段会写为NULL。
UPDATE:上传的数据覆盖原表数据,只覆盖上传数据指定的字段,未指定字段数据保持不变。
operatorBaseId
String
执行上传操作的用户BaseID。
operatorUid
String
执行上传操作的用户UID。
datasourceId
String
上传数据至哪个数据源,此处填写数据源ID。
datasourceName
String
上传数据至哪个数据源,此处填写数据源名称。
tableGuid
String
表的Guid。示例如下:
MaxCompute:
odps.maxcomputeProject.tableName
。EMR_Hive:
emr_hive.emr集群id.schema.tableName
。Holo:
holo.hologres实例id.database
。
queryDwProjectId
String
表所属的DataWorks工作空间ID。
queryDwProjectName
String
表所属的DataWorks工作空间名称。
fileSize
Long
上传文件的大小,单位为字节。
eventCode
String
扩展程序事件编码。
附录:消息格式
发送给EventBridge的消息格式
在配置开放事件(OpenEvent)添加事件分发通道后,若在DataWorks内触发空间级或租户级事件后,会按照在事件总线(EventBridge)中的配置的事件类型(Type)进行过滤,以下为DataWorks通过事件分发通道发送至事件总线EventBridge的消息格式。
{
"datacontenttype": "application/json;charset=utf-8",//参数data的内容形式。datacontenttype只支持application/json格式。
"data": {
//消息类型不同,消息内容也不同。以下为消息中固定的两个字段。各事件消息请参见上文。
"tenantId": 28378****10656,//租户ID,DataWorks每个阿里云主账号对应一个租户,每个租户有自己的租户ID,该值您可在DataWorks数据开发右上角用户信息查看。
"eventCode": "xxxx"//
},
"id": "539fd8f4-4ea1-4625-aa8b-6c906674****",//事件ID。标识事件的唯一值。
"source": "acs.dataworks",//事件源,提供事件的服务,说明此消息由dataworks推送。
"specversion": "1.0",
"subject": "",
"time": "2020-11-19T21:04:41+08:00",//事件产生时间。
"type": "dataworks:InstanceStatusChanges:InstanceStatusChanges",//事件类型。该事件类型可在EventBridge控制台对DataWorks推送的全量消息进行过滤,每个事件的事件Type值不同,请参见上文获取不同事件消息类型。
"aliyunaccountid": "123456789098****",//阿里云主账号ID
"aliyunpublishtime": "2020-11-19T21:04:42.179PRC",//EventBridge接收事件的时间。
"aliyuneventbusname": "default",//用于接收DataWorks事件消息的EventBridge事件总线名称。
"aliyunregionid": "cn-hangzhou",//接收事件的地域。
"aliyunpublishaddr": "172.25.XX.XX"
}
一个完整的事件消息包括消息的实体格式,以及消息的ID、来源、产生时间等基础信息。其中,重要字段的详细说明如下表。
字段名称 | 字段类型 | 说明 |
data | object | 消息实体格式。不同类型事件的消息格式及其包含的字段含义存在差异,详情请参见: 数据开发事件消息格式:数据开发事件列表。 数据集成事件消息格式:数据集成事件列表。 运维中心事件消息格式:运维中心事件列表。 安全中心事件消息格式:安全中心事件列表。 数据质量事件消息格式:数据质量事件列表。 |
id | String | 事件消息的唯一ID。用于定位事件消息。 |
type | String | 事件类型。用于描述事件源相关的事件类型。取值示例如下:
该事件类型可在EventBridge控制台对DataWorks推送的全量消息进行过滤,每个事件的事件Type值不同,请参见上文获取不同事件消息类型。 |
发送给函数计算的消息格式
在DataWorks上配置扩展程序(Extensions):函数计算方式时,DataWorks会将触发对应的扩展点事件时的消息以JSON的形式将消息发送至函数计算,以下为发送至函数计算的消息格式。
{
"blockBusiness": true,
"eventCategoryType": "resources-download",//事件类别
"eventType": "upload-data-to-table",//事件类型
"extensionBizId": "job_6603***070",
"messageBody": {
//消息类型不同,消息内容也不同。以下为消息中固定的两个字段。各事件消息请参见上文。
"tenantId": 28378****10656,//租户ID,DataWorks每个阿里云主账号对应一个租户,每个租户有自己的租户ID,该值您可在DataWorks数据开发右上角用户信息查看。
"eventCode": "xxxx"//
},
"messageId": "52d44ee7-b51f-4d4d-afeb-*******"//事件ID。标识事件的唯一值。
}
重要字段详细说明如下:
字段名称 | 字段类型 | 说明 | |
messageId | String | 事件ID,标识事件的唯一值。 | |
messageBody | DataWorks推送的具体事件消息,您可在开发扩展程序中使用,消息类型不同,此部分内容也不同。 | ||
tenantId | 租户ID,DataWorks每个阿里云主账号对应一个租户,每个租户有自己的租户ID,该值您可在DataWorks数据开发右上角用户信息查看 | ||
eventCode | 事件编码。用于标识某一类事件消息,各事件类型对应的eventCode请查阅开发参考:事件列表与消息格式表格中扩展程序事件类型(eventCode)列。 |