After you enable the message subscription feature in DataWorks Open Platform and configure a Kafka topic, you can develop a client based on Message Queue for Apache Kafka SDK. This way, you can consume DataWorks event messages. This topic describes how to develop a client based on Message Queue for Apache Kafka SDK.
Prerequisites
The message subscription feature is enabled and the relevant configurations are complete. For more information, see Enable message subscription in DataWorks.
The Kafka configurations are obtained. For more information, see View the Kafka configurations.
Limits
You can publish and subscribe to OpenEvent messages only by using SSL endpoints of Message Queue for Apache Kafka that are authenticated based on the PLAIN mechanism.
The OpenEvent module supports only Message Queue for Apache Kafka V2.2.0. We recommend that the versions of the Message Queue for Apache Kafka client and server be consistent.
References
Message Queue for Apache Kafka used by DataWorks allows you to develop a client by using SDKs for different programming languages. The supported programming languages include Java, Python, C++, Go, PHP, Ruby, Node.js, and C#. The following table provides links to the topics that describe how to develop a client by using SDKs for different programming languages.
Before you develop a client, you must obtain the Kafka configurations such as the Kafka topic, consumer group, user, and SSL endpoints from the Open Platform page in the DataWorks console.
If you need to specify the version of Message Queue for Apache Kafka when you configure dependencies, set the version to V2.2.0.
You do not need to develop a producer. For information about the preparations and consumer group configurations required by SDKs for different programming languages, see relevant topics.
Programming language | Network type | Protocol | Port | References |
Java | Internet and VPC | SASL_SSL | 9093 | Send and consume messages by using an SSL endpoint with PLAIN authentication |
python | Internet and VPC | SASL_SSL | 9093 | |
C++ | Internet and VPC | SASL_SSL | 9093 | |
Go | Internet and VPC | SASL_SSL | 9093 | |
PHP | Internet and VPC | SASL_SSL | 9093 | |
Ruby | Internet and VPC | SASL_SSL | 9093 | |
Node.js | Internet and VPC | SASL_SSL | 9093 | |
C# | Internet and VPC | SASL_SSL | 9093 |
Message parsing
The message formats vary based on the types of DataWorks events. For more information about the message formats for different types of DataWorks events, see Appendix: Message formats.
Sample code for event message subscription
The sample code is written in Java. You can refer to the sample code to write code for event message subscription in a programming language based on your business requirements.
Add Project Object Model (POM) dependencies and Kafka dependencies.
<!-- Add Kafka dependencies.--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <!-- Add API dependencies.--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.4.14</version> </dependency>Create a Java Authentication and Authorization Service (JAAS) configuration file named kafka_client_jaas.conf.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };Create a configuration file named kafka.properties for Message Queue for Apache Kafka.
## The SSL endpoint, which can be obtained in the DataWorks console. bootstrap.servers=xxxx ## The Kafka topic, which can be created in the DataWorks console. topic=xxxx ## The consumer group, which can be created in the DataWorks console. group.id=xxxx ## The SSL root certificate. ssl.truststore.location=/xxxx/kafka.client.truststore.jks ## The JAAS configuration file. java.security.auth.login.config=/xxxx/kafka_client_jaas.confYou can view the consumer group ID, Kafka topic, SSL endpoint, username, and password in the DataWorks console. For more information, see View the Kafka configurations. You can set the bootstrap.servers parameter to a value based on the SSL endpoint that is displayed in the DataWorks console. You can publish and subscribe to OpenEvent messages only by using SSL endpoints of Message Queue for Apache Kafka that are authenticated based on the PLAIN mechanism.
Write code for event message subscription.
package com.aliyun.openservices.kafka.ons; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; public class KafkaConsumerDemo { public static void main(String args[]) { // Configure the path of the Simple Authentication and Security Layer (SASL) file. JavaKafkaConfigurer.configureSasl(); // Load the kafka.properties configuration file. Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties(); Properties props = new Properties(); // Specify an SSL endpoint. Obtain the SSL endpoint of the corresponding Kafka topic in the DataWorks console. props.put("bootstrap.servers", kafkaProperties.getProperty("bootstrap.servers")); // Specify the path of the SSL root certificate. Replace XXX with the actual path. // Do not compress the SSL root certificate file into a JAR package. props.put("ssl.truststore.location", kafkaProperties.getProperty("ssl.truststore.location")); // The password of the truststore in the root certificate store. Use the default value. props.put("ssl.truststore.password", "KafkaOnsClient"); // The access protocol. Set the value to SASL_SSL. props.put("security.protocol", "SASL_SSL"); // The SASL authentication method. Use the default value. props.put("sasl.mechanism", "PLAIN"); // The maximum allowed interval between two polling cycles. // The session timeout period. If the consumer does not return a heartbeat before the session times out, the broker determines that the consumer is not alive. Then, the broker removes the consumer from the consumer group and triggers rebalancing. The default value is 30s. props.put("session.timeout.ms", 30000); // The maximum message size allowed for a single poll operation. The parameter value is greatly impacted if data is transmitted over the Internet. props.put("max.partition.fetch.bytes", 32000); props.put("fetch.max.bytes", 32000); // The maximum number of messages that can be polled at a time. // Do not set the number to an excessively large value. If too many messages are polled but fail to be completely consumed before the next poll starts, load balancing is triggered, which causes freezing. props.put("max.poll.records", 30); // The deserialization format of the message. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // The consumer group to which the current consumer instance belongs. Enter the consumer group you have created in the DataWorks console. // The consumer instances that belong to the same consumer group. These instances consume messages in load balancing mode. props.put("group.id", kafkaProperties.getProperty("group.id")); // Set the algorithm for hostname verification to null. props.put("ssl.endpoint.identification.algorithm", ""); // Construct a message object to generate a consumer instance. KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props); // Specify the topics to which the consumer group subscribes. One consumer group can subscribe to multiple topics. // We recommend that you set the topic to the same value for the consumer instances with the same GROUP_ID_CONFIG value. List<String> subscribedTopics = new ArrayList<String>(); // If you want to subscribe to multiple topics, add them here. // You must create the topics in the DataWorks console before you add them. subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); } }