After you configure event message subscription in the OpenEvent module, you can locally develop the message processing code and register an extension in the Extensions module. This way, the extension can respond to specific operations that are performed in the specified workspaces. The extension subscribes to the Message Queue for Apache Kafka topic provided by DataWorks. After specific operations are performed, DataWorks sends related event messages to the extension by using the topic, and determines whether to allow the operations based on the results returned by the extension.

Prerequisites

The following operations are complete:

  • The configurations related to Message Queue for Apache Kafka are complete. DataWorks event messages can be sent as expected. For more information, see Enable message subscription in DataWorks.
  • A consumer program is developed for the extension to receive messages from DataWorks. For more information about the sample code, see Develop consumer programs on your clients.
  • You understand how an extension receives and processes event messages, and returns the processing results to DataWorks by calling a callback API operation. For more information, see Develop and deploy an extension.

Background information

DataWorks interrupts extension point events until it receives responses from extensions. This prevents failures caused by a lack of professional skills in data development and weak awareness of data quality assurance. For example, developers may publish code without strict testing after code modification and development, or perform high-risk operations such as deleting key tables and resource functions by mistake. To resolve such issues, you can locally develop extensions and register the extensions in DataWorks. This way, human check is replaced by program approval. How an extension receives and processes event messagesThe following table describes the supported extension points and provides links to examples on how to develop the event processing code.
Extension point References
Extension point for file commitment Sample code for processing file commitment or deployment events
Extension point for file deployment
Extension point for table commitment Sample code for processing table commitment or deployment events
Extension point for table deployment
Extension point for file deletion Sample code for processing file deletion events
Extension point for code execution Sample code for processing code execution events

Limits

The account that is used to call DataWorks API operations must be granted the AliyunDataWorksFullAccess permission. For more information about how to grant the AliyunDataWorksFullAccess permission to a RAM user by using your Alibaba Cloud account, see Overview.

Sample code for processing file commitment or deployment events

Sample scenario: If the code of the node contains the CREATE TABLE keyword, a warning is thrown, but the process is not blocked. If the code of the node contains the DROP TABLE keyword, the process is blocked.
String messageId = ""; // The ID of the message. The value can be parsed from the event message.
String extensionCode = ""; // The code of the extension. The value can be obtained from the DataWorks console.
Long projectId = 0L;  // The ID of the workspace. The value can be parsed from the event message.
IAcsClient client = null; // Initialize the POP client.

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.CommittedFile dataSnapshot = response.getEventDetail().getCommittedFile();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
    checkResult = "WARN";
    checkResultTip = "We recommend that you do not include CREATE TABLE statements in the code.";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
    checkResult = "FAIL";
    checkResultTip = "Do not include DROP TABLE statements in the code.";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);
                

Sample code for processing table commitment or deployment events

Sample scenario: A lifecycle must be specified before a table can be committed or deployed.
String messageId = ""; // The ID of the message. The value can be parsed from the event message.
String extensionCode = ""; // The code of the extension. The value can be obtained from the DataWorks console.
Long projectId = 0L;  // The ID of the workspace. The value can be parsed from the event message.
IAcsClient client = null; // Initialize the POP client.

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.TableModel dataSnapshot = response.getEventDetail().getTableModel();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getLifeCycle() == null || dataSnapshot.getLifeCycle() == 0) {
    checkResult = "FAIL";
    checkResultTip = "You must specify a lifecycle for the table.";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);

Sample code for processing file deletion events

Sample scenario: No files, including resources, functions, and nodes, can be deleted in the workspace. If the message indicates that a file is to be deleted in the workspace, the process is blocked.
String messageId = ""; // The ID of the message. The value can be parsed from the event message.
String extensionCode = ""; // The code of the extension. The value can be obtained from the DataWorks console.
Long projectId = 0L;  // The ID of the workspace. The value can be parsed from the event message.
IAcsClient client = null; // Initialize the POP client.

String checkResult = "FAIL";
String checkResultTip = "You are not allowed to delete files.";

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);

Sample code for processing code execution events

Sample scenario: If the code of the node contains the CREATE TABLE keyword, a warning is thrown before the node is run. If the code of the node contains the DROP TABLE keyword, the node is blocked.
String messageId = ""; // The ID of the message. The value can be parsed from the event message.
String extensionCode = ""; // The code of the extension. The value can be obtained from the DataWorks console.
Long projectId = 0L;  // The ID of the workspace. The value can be parsed from the event message.
IAcsClient client = null; // Initialize the POP client.

GetIDEEventDetailRequest request = new GetIDEEventDetailRequest();
request.setMessageId(messageId);
request.setProjectId(projectId);
GetIDEEventDetailResponse response = client.getAcsResponse(request);
GetIDEEventDetailResponse.EventDetail.FileExecutionCommand dataSnapshot = response.getEventDetail().getFileExecutionCommand();
String checkResult = "OK";
String checkResultTip = "Success";
if (dataSnapshot.getContent().contains("CREATE TABLE")) {
    checkResult = "WARN";
    checkResultTip = "We recommend that you do not include CREATE TABLE statements in the code.";
}
if (dataSnapshot.getContent().contains("DROP TABLE")) {
    checkResult = "FAIL";
    checkResultTip = "Do not include DROP TABLE statements in the code.";
}

UpdateIDEEventResultRequest request = new UpdateIDEEventResultRequest();
request.setCheckResult(checkResult);
request.setExtensionCode(extensionCode);
request.setCheckResultTip(checkResultTip);
request.setMessageId(messageId);
client.getAcsResponse(request);