DataWorks allows you to use a custom extension to process extension point events to which you subscribe and send processing results to DataWorks by using the OpenAPI module. This topic describes how to develop a custom extension and the precautions that you must take during the procedure.

Prerequisites

The message subscription feature is enabled, a custom extension is registered, and the information that is required to develop the extension is obtained. For more information, see Make preparations.

Develop a custom extension

The following figure shows how a custom extension processes a file commitment event in DataStudio. Time series chartWhen you develop code for a custom extension, take note of the following items:
  1. Subscribe to and parse event messages.
  2. Obtain the snapshots of extension point events and process events based on the snapshots.
    • You can configure the messageId parameter in the API operation that you called to obtain the snapshots of extension point events.
      Note
      • You can call the GetIDEEventDetail operation to obtain the snapshots of extension point events in DataStudio.
      • You can obtain the value of the messageId parameter by parsing event messages.
    • You can use parameters to improve effectiveness of code development and application. For example, you can use the built-in parameter extension.project.disabled of a custom extension to specify that the extension does not take effect for the specified workspaces. For more information about the built-in parameters that you can use to develop extension code, see Configure extension parameters.
    • For more information about the sample extension development code that is used to process different types of extension point events, see Develop the message processing code.
  3. Send processing results to DataWorks.
    You can call an API operation to configure the extension to return the processing results of extension point events to DataWorks.
    Note You can call the UpdateIDEEventResult operation to obtain the snapshots of extension point events in DataStudio.

Sample code used to develop a custom extension

The following sample code provides an example on how to develop a custom extension by using the Java programming language. You can choose a programming language to develop a custom extension based on your business requirements.

  1. Sample code used to add Project Object Model (POM) dependencies and Kafka dependencies:
     <!-- Add Kafka dependencies.-->
    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.2.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    <!-- Add API dependencies.-->
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-core</artifactId>
      <version>4.5.1</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
      <version>3.4.14</version>
    </dependency>
  2. Sample code used to configure a custom extension to process extension point events and return processing results to DataWorks:
    package com.aliyun.openservices.kafka.ons;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import com.alibaba.fastjson.JSONObject;
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.IAcsClient;
    import com.aliyuncs.dataworks_public.model.v20200518.UpdateWorkbenchEventResultRequest;
    import com.aliyuncs.dataworks_public.model.v20200518.UpdateWorkbenchEventResultResponse;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.profile.DefaultProfile;
    import com.google.gson.Gson;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaConsumerDemo {
    
        public static void main(String args[]) {
            // Configure the path of the Simple Authentication and Security Layer (SASL) file.
            JavaKafkaConfigurer.configureSasl();
    
            // Load the kafka.properties configuration file.
            Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            // Specify an SSL endpoint. You can obtain the SSL endpoint of the corresponding Kafka topic in the DataWorks console.
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
            // Specify the path of the SSL root certificate. Replace XXX with the actual path.
            // Do not compress the SSL root certificate file into a JAR package.
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
            // The password of the truststore in the root certificate store. Use the default value.
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            // The access protocol. Set the value to SASL_SSL.
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            // The SASL authentication method. Use the default value.
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            // The maximum allowed interval between two polling cycles.
            // The session timeout period. If the consumer does not return a heartbeat before the session times out, the broker determines that the consumer is not alive. Then, the broker removes the consumer from the consumer group and triggers rebalancing. The default value is 30s.
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
            // The maximum message size allowed for a single poll operation. The parameter value is greatly impacted if data is transmitted over the Internet.
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
            // The maximum number of messages that can be polled at a time.
            // Do not set the number to an excessively large value. If a large number of messages are polled but are not completely consumed before the next polling cycle starts, load balancing is triggered, which causes freezing.
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
            // The deserialization format of the message.
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // The consumer group to which the current consumer instance belongs. Enter the consumer group that you created in the DataWorks console.
            // The consumer instances that belong to the same consumer group. The instances consume messages in load balancing mode.
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
            // Set the algorithm for hostname verification to null.
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    
            // Construct a message object to generate a consumer instance.
            KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            // Specify the topics to which the consumer group subscribes. One consumer group can subscribe to multiple topics.
            // We recommend that you set the topic to the same value for the consumer instances that have the same GROUP_ID_CONFIG value.
            List<String> subscribedTopics = new ArrayList<String>();
            // If you want to subscribe to multiple topics, you can add the topics here.
            // You must create the topics in the DataWorks console before you add them.
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);
    
            // Consume messages cyclically.
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    // The messages must be completely consumed before the next polling cycle starts. The total duration cannot exceed the timeout interval that is specified by SESSION_TIMEOUT_MS_CONFIG.
                    // We recommend that you create a separate thread pool to consume messages and return the results in an asynchronous manner.
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                        System.out.println("Consume message:" + record.key() + "," + record.value());
                        // 1. For more information about the structures of parsing results, see https://www.alibabacloud.com/help/dataworks/latest/appendix-message-formats
    
    
    
                        JSONObject messageContent = JSONObject.parseObject(record.value());
                        String messageId = messageContent.getString("messageId");
                        // 2. Create a custom extension to process extension point events.
                        //doSomeThing();
    
                        // 3. Call an API operation to configure a custom extension to return processing results to DataWorks. The sample code is provided only for reference. You can write code based on best practices.
                        DefaultProfile profile = DefaultProfile.getProfile("cn-shanghai", "<your-access-key-id>", "<your-access-key-secret>");
                        /** use STS Token
                         DefaultProfile profile = DefaultProfile.getProfile(
                         "<your-region-id>",           // The region ID
                         "<your-access-key-id>",       // The AccessKey ID of the RAM account
                         "<your-access-key-secret>",   // The AccessKey Secret of the RAM account
                         "<your-sts-token>");          // STS Token
                         **/
                        IAcsClient client = new DefaultAcsClient(profile);
                        String extensioncode = "xxxx";
                        UpdateWorkbenchEventResultRequest request = new UpdateWorkbenchEventResultRequest();
                        request.setMessageId(messageId);
                        request.setExtensionCode(extensioncode);
                        // To allow the workflow to pass the check, set the value of the parameter to OK. Otherwise, set the value of the parameter to FAIL.
                        request.setCheckResult("OK");
    
                        try {
                            UpdateWorkbenchEventResultResponse response = client.getAcsResponse(request);
                            System.out.println(new Gson().toJson(response));
                        } catch (ServerException e) {
                            e.printStackTrace();
                        } catch (ClientException e) {
                            System.out.println("ErrCode:" + e.getErrCode());
                            System.out.println("ErrMsg:" + e.getErrMsg());
                            System.out.println("RequestId:" + e.getRequestId());
                        }
    
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);
                    } catch (Throwable ignore) {
    
                    }          
                            
                            
                      
                    // See common errors: https://www.alibabacloud.com/help/function-compute/latest/faq-faq
                    e.printStackTrace();
                }
            }
        }
    }
                        

Deploy a custom extension

After you develop and debug the code of a custom extension, you can deploy the extension to your on-premises machine or to an Alibaba Cloud Elastic Compute Service (ECS) instance as a service application. You can also develop a web page to allow users to view the processing process and status of each event, and a help documentation page to help users understand the business logic of the extension. You can add the page URLs and the parameters that you use in the code to the registration information of the extension in the To register your Extension program (Extension) dialog box. Deploy a custom extensionThe following table describes the parameters in the To register your Extension program (Extension) dialog box.
Parameter Description
Extension name The name of the extension.
Extension point of processing The type of extension point event that you want the extension to check. For more information about supported extension point events, see Supported extension point events.
Note After you set this parameter, the system automatically sets the Event and Applicable module parameters.
Head The person in charge of the extension. Users of the extension can contact the person when they face problems.
Test workspace The workspace that is used to test the extension.

To check whether the extension is effective, you do not need to publish the extension, because the test workspace allows an end-to-end test for you to check whether the extension performs as expected.

In the test workspace, developers can trigger events to check whether DataWorks sends related messages to the extension by using Kafka and whether the extension checks the events and sends the check results to DataWorks.

Extensions Details Address The URL of the extension details page.

After you develop and deploy an extension, you can develop a web page to display how the extension works. Set this parameter to the page URL so that users can visit this web page to better understand and use the extension. For example, users can visit the web page to check why a node is blocked by the extension.

Extension document address The URL of the extension documentation page.

After you develop and deploy an extension, you can develop a help documentation page. Set this parameter to the page URL so that users can know the business logic and properties of the extension.

Extensions parameter configuration The extension parameters. To accelerate extension development, you can enter both the built-in parameters for typical scenarios and custom parameters for future reference.

You can enter multiple parameters, each occupying one line and defined in the format of key=value. For more information about how to use these parameters, see Set extension parameters.