DataWorks支持对成功提交至开发环境的标准模式工作空间节点,在发布到生产环境前,进行待发布状态检查。您可以通过DataWorks提供的检查器机制,根据业务需求判断目标节点是否可以继续后续的发布流程,保障能够发布符合要求的节点任务。本文为您介绍如何使用开放消息和POP接口检查待发布节点。

前提条件

背景信息

消息队列Kafka版,用于大数据领域的日志收集、监控数据聚合、流式数据处理、在线和离线分析,详情请参见消息队列Kafka

DataWorks为您提供了开放消息功能,您可以订阅文件发布检查事件,快速获取并识别事件消息的状态变更信息。当您在DataWorks标准模式工作空间的数据开发页面创建的节点提交成功后,该节点需要发布到生产环境,如果您的DataWorks工作空间配置了检查器,则该节点将进入待发布的检查状态。DataWorks会根据检查器的配置信息将目标节点本次发布的相关信息通过Kafka消息下发至您的服务。您需要根据事件内容判断该节点是否可以继续发布,并调用CheckFileDeployment接口将节点的文件发布检查事件的检查结果返回给DataWorks。当检查校验通过后才可以继续后续的发布流程。校验流程如下图所示。校验流程
  1. DataWorks数据开发页面创建的节点提交成功,该节点进入待发布状态。DataWorks根据工作空间关联的检查器发起检查流程,节点自动进入待检查状态。

    由于DataWorks的检查器未对外开放,您需要提交工单将检查器的配置信息交由管理员,由管理员统一配置。配置检查器,详情请参见配置检查器

  2. DataWorks的检查器发送文件发布检查事件至Kafka。
  3. Kafka将文件发布检查事件转发至用户的服务,进行后续的审批处理。
  4. 您可以根据收到的消息进行判断,审批目标节点的发布状态,并通过调用CheckFileDeployment接口,发送审批结果至DataWorks。CheckFileDeployment接口的配置信息,详情请参见CheckFileDeployment

    您可以通过消息中携带的工作空间ID、节点ID、节点版本等信息,调用GetFileVersion接口查询本次发布的内容,并对该内容进行自动扫描,根据扫描结果判断是否可以正常发布。如果无法通过自动扫描确定是否可以正常发布,则需要使用人工审批流程,进行后续的人工审核。GetFileVersion接口的配置,详情请参见GetFileVersion

    说明 您可以登录DataWorks控制台,鼠标悬停至顶部菜单栏右侧的用户头像,单击AccessKey管理,获取调用DataWorks OpenAPI需要使用的AccessKey IDAccessKey Secret
  5. DataWorks根据收到的检查结果,修改目标节点的最终发布状态。发布状态如下:
    • OK:表示文件检查通过,可以发布。
    • WARN:表示文件检查通过,但是存在警告,文件可以发布。
    • FAIL:表示文件检查未通过,不能发布。
说明 如果一个DataWorks工作空间关联了多个检查器,当有待发布的节点时,DataWorks的每一个检查器会启动一个检查流程,每个检查流程均会生成一个checkerInstanceId(即文件检查器所属的实例ID)。每个检查器会分别发送一条Kafka消息至用户,用户需要通过CheckFileDeployment接口,把所有检查器生成的检查结果返回给DataWorks,DataWorks将检查结果汇总,给出最终的检查状态。当所有检查器的检查结果均为通过时,最终检查结果才为通过,本次发布正常进行;只要有检查器的检查结果为不通过,则最终检查结果为不通过,本次发布会被拦截。
DataWorks的检查器机制,常用的功能场景示例如下:
  • 检查目标节点的代码中是否包含表变更的语句。您可以通过变更内容,判断该变更是否会修改核心表的结构。如果涉及修改核心表的结构,则您可以拦截此次发布。
  • 检查目标节点代码的SQL性能。例如,执行节点任务时,扫描全部表会降低代码性能,如果您希望执行的节点任务SQL性能较好,则可以检查代码中是否包含全表扫描的SQL语句。当包含相关语句时,您可以拦截此次发布。
  • 检查目标节点中是否包含账号及密码等敏感信息,当包含敏感时,您可以拦截此次发布,保证敏感信息不被泄露。
使用开放消息和POP接口检查待发布文件的校验步骤如下:

使用限制

  • DataWorks仅支持企业版及以上版本使用开放消息和POP接口检查待发布节点。
  • DataWorks仅支持检查标准模式工作空间中的待发布节点。

开启并配置消息订阅

  1. 进入开放平台开启消息订阅服务。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击开放平台
    3. 开放消息页签,打开启动消息订阅开关,即可开启消息订阅服务。
      说明 开启消息订阅服务后,DataWorks将会把指定事件类型的消息推送到您创建的Topic中,若关闭启动消息订阅开关,则会停止所有Topic的消息推送。
  2. 新建消息分类Topic。
    1. 开放消息页签,单击新建Topic
    2. 新建Topic对话框,输入Topic名称及描述信息。
      说明 该Topic主要描述消息的主题,用于消息分类。Topic名称一旦创建,无法修改。
    3. 单击确定,完成Topic的创建。
  3. 配置Topic的订阅信息。
    1. 在所创建Topic的操作列,单击订阅设置
    2. Topic订阅内容设置对话框,选择需要订阅的工作空间,并勾选订阅事件类型文件发布检查事件订阅配置
      说明 一个Topic最多可以订阅5个工作空间。

配置检查器

由于DataWorks的检查器未对外开放,您需要提交工单将检查器的配置信息交由管理员,由管理员统一配置。
配置检查器需要提供的配置项信息如下表所示。
配置项 描述
检查器的唯一标识符 标识符自定义。检查流程中的Kafka消息会携带该标识符,方便您使用该标识符来判断所使用的检查器。

检查器的唯一标识符需要以英文字母开头,只能包含英文字母、数字及下划线(_),最多支持64个字符,并且确定后不可变更。

检查器的唯一标识符对应收到的Kafka消息中的checkerIdentify字段。Kafka消息的字段,详情请参见DataWorks向用户发送文件发布检查事件

检查器的名称 名称自定义。用于在检查详情页,查看本次发布被哪个检查器所拦截。
检查器责任人 阿里云的账号ID。您可以登录DataWorks控制台,鼠标悬停至顶部菜单栏右侧的用户头像,查看账号ID。方便当节点任务被检查器拦截时,可以定位到对应的检查器责任人。
检查器描述信息 最多支持600个字符。
检查器检查流程页面地址 如果您使用的是非全自动的检查流程,而是在检查流程发起之后,需要人为执行审批操作的检查流程,则需要提供人工审核的页面地址。

该地址会配置在检查页面的进入详情页面,引导用户提交审批流程。跳转地址格式为您所提供的人工审核页面地址+checkerInstanceId。例如,提供的人工审核的页面地址为https://www.aliyun.com/,本次检查器的实例ID(checkerInstanceId)为82_1235776432_3,则最终的跳转地址为https://www.aliyun.com/82_1235776432_3

checkerInstanceId为每次自动检查待发布节点的检查器流程ID,用于串联一次发布流程,系统自动生成,不可修改。

需要拦截的文件类型列表 系统会根据配置的文件类型选择性拦截,不配置则默认拦截所有类型的文件。
如果您配置了检查器,当目标节点提交成功后,您可以执行如下操作:
  1. 进入任务发布页面,查看节点的检查状态。进入任务发布页面,详情请参见发布标准模式工作空间的节点节点检查状态
  2. 单击目标节点状态,查看节点的检查详情。

    您可以查看DataWorks工作空间关联的哪个检查器阻碍了发布流程。如果使用的是您配置的检查器,单击目标检查器后的进入详情页面,则会跳转至您配置检查器时所提供的检查器检查流程页面地址

    节点检查详情

DataWorks向用户发送文件发布检查事件

DataWorks向Kafka发送文件发布检查事件后,Kafka会将其转发至用户的服务,用于后续用户进行审批操作。事件的参数及描述信息格式如下。
{
    "isContentChange": true,                     
    "baseId": "**************",                     
    "appId": ******,                         
    "tenantId": ***************,                     
    "cloudUuid": *************,                     
    "type": ***,                             
    "fileName": "********",                     
    "owner": "****************",                     
    "projectOwner": "*************",                 
    "projectName": "***********",                     
    "checkerIdentify": "************",                 
    "checkerInstanceId": "***************",                    
    "changeType": "UPDATE",                        
    "useType": 0,                            
    "fileCreateTime": "2021-01-15 14:03:02",            
    "fileId": **********,                        
    "connName": "*******",                        
    "fileVersion": 3                        
}
参数描述如下表所示。
参数 描述
isContentChange 相较于前一次文件内容是否有变动。取值如下:
  • true:文件内容变动。
  • false:文件内容未变动。
baseId 提交节点任务的用户ID。
appId DataWorks工作空间的ID。

您可以调用GetFileVersion获取目标节点本次发布的详细内容,appIdfileIdfileVersion可以作为GetFileVersion接口的入参信息。

tenantId 租户ID。
cloudUuid 调度节点的ID。
type 提交节点的节点类型。
fileName 提交节点的节点名称。
owner 提交节点对应节点责任人的阿里云账号ID。
projectOwner DataWorks工作空间的Owner。
projectName DataWorks工作空间的名称。
checkerIdentify 检查器的唯一标识。
checkerInstanceId 检查器实例的唯一标识。
changeType 提交节点的变更类型。取值如下:
  • UPDATE:节点内容有更新。
  • DELETE:节点内容有删除。
  • CREATE:节点有新建的内容。
useType 功能模块。例如,数据开发页面的业务流程(0)手动业务流程(2)临时查询(10)等。
fileCreateTime 提交节点的创建时间。
fileId 提交节点的节点ID。

您可以调用GetFileVersion获取目标节点本次发布的详细内容,appIdfileIdfileVersion可以作为GetFileVersion接口的入参信息。

connName 提交的节点所使用的数据源。
fileVersion 提交节点的版本。

您可以调用GetFileVersion获取目标节点本次发布的详细内容,appIdfileIdfileVersion可以作为GetFileVersion接口的入参信息。

用户审核文件发布检查事件并返回最终检查结果

当您接收到Kafka推送的文件发布检查事件后,会对该事件的内容进行审核,并调用CheckFileDeployment接口,将审核结果及检查器的检查流程页面地址返回给DataWorks,DataWorks根据收到的审核结果修改目标节点的最终发布状态。当最终检查结果为通过时,则本次发布正常进行;当最终检查结果为不通过时,则本次发布会被拦截。CheckFileDeployment接口的配置,详情请参见CheckFileDeployment
说明 如果一个DataWorks工作空间关联了多个检查器,当有待发布的节点时,DataWorks的每一个检查器会启动一个检查流程,每个检查流程均会生成一个checkerInstanceId(即文件检查器所属的实例ID)。每个检查器会分别发送一条Kafka消息至用户,用户需要通过CheckFileDeployment接口,把所有检查器生成的检查结果返回给DataWorks,DataWorks将检查结果汇总,给出最终的检查状态。当所有检查器的检查结果均为通过时,最终检查结果才为通过,本次发布正常进行;只要有检查器的检查结果为不通过,则最终检查结果为不通过,本次发布会被拦截。
如下代码为您展示了,当用户收到Kafka发送的文件发布检查事件后,检查本次发布中是否包含表变更,并调用CheckFileDeployment接口返回审批结果至DataWorks的流程示例。
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.CheckFileDeploymentRequest;
import com.aliyuncs.dataworks_public.model.v20200518.GetFileVersionRequest;
import com.aliyuncs.dataworks_public.model.v20200518.GetFileVersionResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class DataStudioKafkaChecker {

    private static final String KAFKA_TOPIC; //表示在DataWorks控制台注册的Kafka消息Topic。
    private static final String DW_API_AK_ID; //调用DataWorks OpenAPI所需的AccessKey ID。
    private static final String DW_API_AK_SECRET; //调用DataWorks OpenAPI所需的AccessKey Secret。
    
    public static void main(String[] args) {
        /*
         * 初始化并启动Kafka消息的Consumer。
         */
        KafkaConsumer<String, String> consumer; //初始化Consumer。
        //设置消费组(GROUP_ID_CONFIG)订阅的Topic,您可以订阅多个Topic。
        //如果消费组相同,则订阅的Topic也建议设置为相同的Topic。
        List<String> subscribedTopics =  new ArrayList<String>();
        //如果需要订阅多个Topic,则使用Add语法添加即可。
        //您需要登录DataWorks控制台创建Topic。
        subscribedTopics.add(KAFKA_TOPIC);
        consumer.subscribe(subscribedTopics);

        //初始化DataWorks OpenAPI的客户端(如下代码以杭州地域示例)。
        DefaultProfile.addEndpoint("cn-hangzhou", "dataworks-public", "dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client = new DefaultAcsClient(DefaultProfile.getProfile("cn-hangzhou", DW_API_AK_ID, DW_API_AK_SECRET));
        
        Gson gson = new Gson();

        //循环消费消息。
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                //必须在下次poll之前消费完这些数据,且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议使用单独的线程池来消费消息,并异步返回结果。
                for (ConsumerRecord<String, String> record : records) {
                    KafkaMessage kafkaMessage = gson.fromJson(record.value(), KafkaMessage.class);
                    CheckerMessage checkerMessage = kafkaMessage.messageBody;
                    
                    //根据消息中的信息检查本次发布的内容。
                    GetFileVersionRequest getFileVersionRequest = new GetFileVersionRequest();
                    getFileVersionRequest.setFileId(checkerMessage.fileId);
                    getFileVersionRequest.setFileVersion(checkerMessage.fileVersion);
                    getFileVersionRequest.setProjectId(checkerMessage.appId);
                    GetFileVersionResponse getFileVersionResponse = client.getAcsResponse(getFileVersionRequest);
                    System.out.println(getFileVersionResponse.getData().getFileContent());
                    
                    //检查发布节点的代码中是否包含表变更的代码。
                    //如果包含create table语句,则检查不通过
                    //如果包含alter table语句,则会产生告警。
                    //如果不包含create tablealter table语句,则检查通过。
                    //执行的具体业务规则,可以根据您的业务需求定制。
                    CheckFileDeploymentRequest checkFileDeploymentRequest = new CheckFileDeploymentRequest();
                    checkFileDeploymentRequest.setCheckerInstanceId(checkerMessage.checkerInstanceId);
                    if (getFileVersionResponse.getData().getFileContent().toLowerCase().contains("create table")) {
                        checkFileDeploymentRequest.setStatus("FAIL");
                    } else if (getFileVersionResponse.getData().getFileContent().toLowerCase().contains("alter table")) {
                        checkFileDeploymentRequest.setStatus("WARN");
                    } else {
                        checkFileDeploymentRequest.setStatus("OK");
                    }
                    client.getAcsResponse(checkFileDeploymentRequest);
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                e.printStackTrace();
            }
        }
    }

    private class KafkaMessage {
        public CheckerMessage messageBody;
    }

    private class CheckerMessage {
        public String owner;
        public String fileName;
        public String checkerIdentify;
        public String changeType;
        public String connName;
        public Long cloudUuid;
        public String baseId;
        public Integer type;
        public Integer useType;
        public boolean isContentChange;
        public String fileCreateTime;
        public String checkerInstanceId;
        public Long appId;
        public Long tenantId;
        public Integer fileVersion;
        public Long fileId;
    }
}