All Products
Search
Document Center

ApsaraMQ for Kafka:Send and receive messages by using the default endpoint

Last Updated:Jan 24, 2024

This topic describes how to use the SDK for Java to connect to the default endpoint of ApsaraMQ for Kafka from a Java client and send and receive messages in a virtual private cloud (VPC).

Prerequisites

Install Java dependencies

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependencies to the pom.xml file:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.6</version>
    </dependency>
    Note

    We recommend that your client version be consistent with the major version of your ApsaraMQ for Kafka instance. You can view the major version of your ApsaraMQ for Kafka instance on the Instance Details page in the ApsaraMQ for Kafka console.

Prepare a configuration file

  1. Optional: Download the SSL root certificate. If you use an SSL endpoint to access the ApsaraMQ for Kafka instance, you must install the certificate.

  2. Go to the Aliware-kafka-demos page, click the download icon to download the demo project to your on-premises machine, and then decompress the demo project.

  3. In the decompressed demo project, find the kafka-java-demo folder and import the files in the folder to IntelliJ IDEA.

  4. Optional: If you use an SSL endpoint or a Simple Authentication and Security Layer (SASL) endpoint to access the ApsaraMQ for Kafka instance, modify the kafka_client_jaas.conf configuration file. For information about different endpoints of an instance, see Comparison among endpoints.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    }; 

    Enter the username and password of the instance in the username and password fields.

    • If the access control list (ACL) feature is disabled for the instance, you can obtain the default username and password in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the instance, make sure that the SASL user that you want to use is of the PLAIN type and that the user is authorized to send and receive messages. For more information, see Grant permissions to SASL users.

  5. Modify the kafka.properties configuration file.

    ##==============================Common configuration parameters==============================
    bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
    topic=xxx
    group.id=xxx
    ##=======================Configure the following parameters based on your business requirements.========================
    ## Configure the SSL endpoint.
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf
    ## Configure the PLAIN mechanism for the SASL endpoint.
    java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf
    ## Configure the Salted Challenge Response Authentication Mechanism (SCRAM) for the SASL endpoint.
    java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.conf

    Parameter

    Description

    bootstrap.servers

    The endpoint information. You can obtain the information in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic

    The name of the topic that you created in the instance. You can obtain the name of the topic on the Topics page in the ApsaraMQ for Kafka console.

    group.id

    The ID of the consumer group that you created in the instance. You can obtain the ID of the consumer group on the Groups page in the ApsaraMQ for Kafka console.

    Note

    If the client runs producer.go to send messages, this parameter is optional. If the client runs consumer.go to subscribe to messages, this parameter is required.

    ssl.truststore.location

    The path of the SSL root certificate. Replace xxxx with the actual value. Example: /home/doc/project/kafka-java-demo/ssl/src/main/resources/kafka.client.truststore.jks.

    Note

    If the default endpoint or an SASL endpoint is used, this parameter is not required. If an SSL endpoint is used, this parameter is required.

    kafka_client_jaas.conf

    The path where the JAAS configuration file is saved. Replace xxxx with the actual value. Example: /home/doc/project/kafka-java-demo/ssl/src/main/resources/kafka_client_jaas_scram.conf.

    Note

    If the default endpoint is used, this parameter is not required. If an SSL endpoint or an SASL endpoint is used, this parameter is required.

Send messages

The following sample code provides an example on how to compile and run KafkaProducerDemo.java to send messages:

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// If you use an SSL endpoint or SASL endpoint to access the ApsaraMQ for Kafka instance, comment out the first line of the following code: 
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
*If you use an SSL endpoint or SASL endpoint to access the ApsaraMQ for Kafka instance, uncomment the following two lines of code: 
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaProducerDemo {

    public static void main(String args[]) {
          
       /*
        * If you use an SSL endpoint to access the ApsaraMQ for Kafka instance, uncomment the following line of code: 
        Specify the path of the JAAS configuration file. 
        JavaKafkaConfigurer.configureSasl();
        */
         
       /*
        * If you use an SASL endpoint that uses the PLAIN mechanism to access the ApsaraMQ for Kafka instance, uncomment the following line of code: 
        Specify the path of the JAAS configuration file. 
        JavaKafkaConfigurer.configureSaslPlain();
        */
       
       /*
        * If you use an SASL endpoint that uses the SCRAM mechanism to access the ApsaraMQ for Kafka instance, uncomment the following line of code: 
        Specify the path of the JAAS configuration file. 
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Load the kafka.properties file. 
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Specify the endpoint. You can obtain the endpoint of the topic on the Instance Details page in the ApsaraMQ for Kafka console. 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
         
       /*
        * If you use an SSL endpoint to access the ApsaraMQ for Kafka instance, uncomment the following four lines of code: 
        * Do not compress the file into a JAR package. 
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        * The password of the truststore in the root certificate. Use the default value. 
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        * The access protocol. Set this parameter to SASL_SSL. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        * The SASL authentication method. Use the default value. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * If you use an SASL endpoint that uses the PLAIN mechanism to access the ApsaraMQ for Kafka instance, uncomment the following two lines of code: 
        * The access protocol. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * The PLAIN mechanism. 
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * If you use an SASL endpoint that uses the SCRAM mechanism to access the ApsaraMQ for Kafka instance, uncomment the following two lines of code: 
        * The access protocol. 
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * The SCRAM mechanism. 
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        */

        // The method that is used to serialize messages in ApsaraMQ for Kafka. 
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // The maximum waiting time for a request. 
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // Specify the maximum number of retries for the messages in the client. 
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // Specify the interval between two consecutive retries for the messages in the client. 
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
         
       /*
        * If you use an SSL endpoint to access the ApsaraMQ for Kafka instance, uncomment the following line of code: 
        * Set the algorithm for hostname verification to an empty value. 
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        */

        // Construct a thread-safe producer object. Construct one producer object for a process. 
        // To improve performance, you can construct multiple producer objects. We recommend that you construct no more than five producer objects. 
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Construct an ApsaraMQ for Kafka message. 
        String topic = kafkaProperties.getProperty("topic"); // The topic to which the message belongs. Enter the topic that you created in the ApsaraMQ for Kafka console. 
        String value = "this is the message's value"; // The content of the message. 

        try {
            // Obtaining multiple future objects at a time can help improve efficiency. However, do not obtain a large number of future objects at a time. 
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 100; i++) {
                // Send the message and obtain a future object. 
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                // Obtain the results of the future object in a synchronous manner. 
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // If the message still fails to be sent after retries, troubleshoot the error. 
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Subscribe to messages

You can subscribe to messages by using one of the following methods: