All Products
Search
Document Center

:Develop a client based on Message Queue for Apache Kafka SDK

Last Updated:Sep 10, 2024

After you enable the message subscription feature in DataWorks Open Platform and configure a Kafka topic, you can develop a client based on Message Queue for Apache Kafka SDK. This way, you can consume DataWorks event messages. This topic describes how to develop a client based on Message Queue for Apache Kafka SDK.

Prerequisites

Limits

  • You can publish and subscribe to OpenEvent messages only by using SSL endpoints of Message Queue for Apache Kafka that are authenticated based on the PLAIN mechanism.

  • The OpenEvent module supports only Message Queue for Apache Kafka V2.2.0. We recommend that the versions of the Message Queue for Apache Kafka client and server be consistent.

References

Message Queue for Apache Kafka used by DataWorks allows you to develop a client by using SDKs for different programming languages. The supported programming languages include Java, Python, C++, Go, PHP, Ruby, Node.js, and C#. The following table provides links to the topics that describe how to develop a client by using SDKs for different programming languages.

Important
  • Before you develop a client, you must obtain the Kafka configurations such as the Kafka topic, consumer group, user, and SSL endpoints from the Open Platform page in the DataWorks console.

  • If you need to specify the version of Message Queue for Apache Kafka when you configure dependencies, set the version to V2.2.0.

  • You do not need to develop a producer. For information about the preparations and consumer group configurations required by SDKs for different programming languages, see relevant topics.

Programming language

Network type

Protocol

Port

References

Java

Internet and VPC

SASL_SSL

9093

Send and consume messages by using an SSL endpoint with PLAIN authentication

python

Internet and VPC

SASL_SSL

9093

Use the SDK for Python to send and receive messages

C++

Internet and VPC

SASL_SSL

9093

Use the SDK for C++ to send and receive messages

Go

Internet and VPC

SASL_SSL

9093

Use the SDK for Go to send and receive messages

PHP

Internet and VPC

SASL_SSL

9093

Use the SDK for PHP to send and receive messages

Ruby

Internet and VPC

SASL_SSL

9093

Use the SDK for Ruby to send and subscribe to messages

Node.js

Internet and VPC

SASL_SSL

9093

Use the SDK for Node.js to send and receive messages

C#

Internet and VPC

SASL_SSL

9093

Use the SDK for C# to send and receive messages

Message parsing

The message formats vary based on the types of DataWorks events. For more information about the message formats for different types of DataWorks events, see Appendix: Message formats.

Sample code for event message subscription

The sample code is written in Java. You can refer to the sample code to write code for event message subscription in a programming language based on your business requirements.

  1. Add Project Object Model (POM) dependencies and Kafka dependencies.

     <!-- Add Kafka dependencies.-->
    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.2.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    <!-- Add API dependencies.-->
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-core</artifactId>
      <version>4.5.1</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
      <version>3.4.14</version>
    </dependency>
  2. Create a Java Authentication and Authorization Service (JAAS) configuration file named kafka_client_jaas.conf.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    };  
  3. Create a configuration file named kafka.properties for Message Queue for Apache Kafka.

    ## The SSL endpoint, which can be obtained in the DataWorks console. 
    bootstrap.servers=xxxx
    ## The Kafka topic, which can be created in the DataWorks console. 
    topic=xxxx
    ## The consumer group, which can be created in the DataWorks console. 
    group.id=xxxx
    ## The SSL root certificate. 
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    ## The JAAS configuration file. 
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf    

    You can view the consumer group ID, Kafka topic, SSL endpoint, username, and password in the DataWorks console. For more information, see View the Kafka configurations. You can set the bootstrap.servers parameter to a value based on the SSL endpoint that is displayed in the DataWorks console. You can publish and subscribe to OpenEvent messages only by using SSL endpoints of Message Queue for Apache Kafka that are authenticated based on the PLAIN mechanism.

  4. Write code for event message subscription.

    package com.aliyun.openservices.kafka.ons;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaConsumerDemo {
    
        public static void main(String args[]) {
            // Configure the path of the Simple Authentication and Security Layer (SASL) file.
            JavaKafkaConfigurer.configureSasl();
    
            // Load the kafka.properties configuration file.
            Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            // Specify an SSL endpoint. Obtain the SSL endpoint of the corresponding Kafka topic in the DataWorks console.
            props.put("bootstrap.servers", kafkaProperties.getProperty("bootstrap.servers"));
            // Specify the path of the SSL root certificate. Replace XXX with the actual path.
            // Do not compress the SSL root certificate file into a JAR package.
            props.put("ssl.truststore.location", kafkaProperties.getProperty("ssl.truststore.location"));
            // The password of the truststore in the root certificate store. Use the default value.
            props.put("ssl.truststore.password", "KafkaOnsClient");
            // The access protocol. Set the value to SASL_SSL.
            props.put("security.protocol", "SASL_SSL");
            // The SASL authentication method. Use the default value.
            props.put("sasl.mechanism", "PLAIN");
            // The maximum allowed interval between two polling cycles.
            // The session timeout period. If the consumer does not return a heartbeat before the session times out, the broker determines that the consumer is not alive. Then, the broker removes the consumer from the consumer group and triggers rebalancing. The default value is 30s.
            props.put("session.timeout.ms", 30000);
            // The maximum message size allowed for a single poll operation. The parameter value is greatly impacted if data is transmitted over the Internet.
            props.put("max.partition.fetch.bytes", 32000);
            props.put("fetch.max.bytes", 32000);
            // The maximum number of messages that can be polled at a time.
            // Do not set the number to an excessively large value. If too many messages are polled but fail to be completely consumed before the next poll starts, load balancing is triggered, which causes freezing.
            props.put("max.poll.records", 30);
            // The deserialization format of the message.
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // The consumer group to which the current consumer instance belongs. Enter the consumer group you have created in the DataWorks console.
            // The consumer instances that belong to the same consumer group. These instances consume messages in load balancing mode.
            props.put("group.id", kafkaProperties.getProperty("group.id"));
            // Set the algorithm for hostname verification to null.
            props.put("ssl.endpoint.identification.algorithm", "");
    
            // Construct a message object to generate a consumer instance.
            KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            // Specify the topics to which the consumer group subscribes. One consumer group can subscribe to multiple topics.
            // We recommend that you set the topic to the same value for the consumer instances with the same GROUP_ID_CONFIG value.
            List<String> subscribedTopics = new ArrayList<String>();
            // If you want to subscribe to multiple topics, add them here.
            // You must create the topics in the DataWorks console before you add them.
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);
    
        }
    }