All Products
Search
Document Center

ApsaraMQ for Kafka:Use instance endpoints to send and receive messages

Last Updated:Oct 15, 2025

You can use an instance endpoint to connect your application to an ApsaraMQ for Kafka cluster to send and receive messages. ApsaraMQ for Kafka provides default, Security Sockets Layer (SSL), and Simple Authentication and Security Layer (SASL) endpoints to meet your connection and security requirements.

Prerequisites

  • JDK 1.8 or later is installed. For more information, see Java SE Downloads.

  • Maven 2.5 or later is installed. For more information, see Download Apache Maven.

  • You can install compilation tools.

    This topic uses IntelliJ IDEA Ultimate as an example.

  • An ApsaraMQ for Kafka instance is purchased and deployed.

    • VPC-connected instance: Only the default endpoint is available. You can access this type of instance only from within a VPC.

    • Internet- and VPC-connected instance: The default and SSL endpoints are available. You can access this type of instance over the Internet or from within a VPC.

    Note
    • You can switch between VPC-connected instances and Internet- and VPC-connected instances by upgrading or downgrading the instance configurations. For more information, see Upgrade or downgrade instance configurations.

    • By default, SASL endpoints are disabled for instances and are not displayed. If you want to use a SASL endpoint, you must enable it. For more information, see Grant permissions to a SASL user.

    • For more information about the scenarios for each endpoint, see Endpoint comparison.

Install Java dependencies

The following code shows the dependencies required to use the Java SDK to connect to an ApsaraMQ for Kafka instance. These dependencies are included in the pom.xml file in the kafka-java-demo folder and do not need to be added manually.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
Note

We recommend that you use a client library version that is consistent with the major version of your ApsaraMQ for Kafka instance. You can view the major version of your ApsaraMQ for Kafka instance on the Instance Details page in the ApsaraMQ for Kafka console.

Prepare configurations

  1. (Optional) Download the SSL root certificate. This step is required only if you use an SSL endpoint.

  2. Go to aliware-kafka-demos, click download to download the demo project, and then decompress the package.

  3. In the decompressed demo project, find the kafka-java-demo folder and import it into IntelliJ IDEA.

  4. (Optional) If you use an SSL or SASL endpoint to access the instance, modify the kafka_client_jaas.conf configuration file. For more information about the instance endpoints, see Endpoint comparison.

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    }; 

    If your ApsaraMQ for Kafka instance is a VPC-connected instance, only resources in the same VPC can access the instance. This ensures the security and privacy of data transmission. In scenarios that require high security, you can enable the access control list (ACL) feature. After you enable this feature, messages are transmitted over a secure channel only after successful SASL identity authentication. You can select the PLAIN or SCRAM mechanism for identity authentication based on your security requirements. For more information, see Enable the ACL feature.

    If your ApsaraMQ for Kafka instance is an Internet- and VPC-connected instance, messages must be authenticated and encrypted when transmitted over the Internet. The SASL PLAIN mechanism must be used with SSL at the transport layer. Use the SASL_SSL protocol to prevent messages from being transmitted in plaintext.

    The username and password in the example are the SASL username and password for the instance.

    • If your instance has Internet access enabled but does not use the ACL feature, you can obtain the username and password for the default user from the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the instance, ensure that the SASL user is of the PLAIN type and has permissions to send and receive messages. For more information, see Grant permissions to a SASL user.

  5. Modify the kafka.properties configuration file.

    ##==============================Common configuration parameters==============================
    bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
    topic=xxx
    group.id=xxx
    ##=======================Configure the following parameters as needed========================
    ##SSL endpoint configuration
    ssl.truststore.location=/xxxx/only.4096.client.truststore.jks
    ##The ssl.truststore.password must be KafkaOnsClient. Do not modify.
    ssl.truststore.password=KafkaOnsClient
    ##Hostname verification. Keep it empty. Do not modify.
    ssl.endpoint.identification.algorithm=
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf
    ##SASL endpoint PLAIN mechanism configuration
    java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf
    ##SASL endpoint SCRAM mechanism configuration
    java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.conf

    Parameter

    Description

    bootstrap.servers

    The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint from the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    topic

    The name of the topic on the instance. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    group.id

    The ID of the group on the instance. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.

    Note

    If the application runs producer.go to send messages, this parameter is optional. If the application runs consumer.go to subscribe to messages, this parameter is required.

    ssl.truststore.location

    The path of the SSL root certificate. Save the SSL certificate file that you downloaded in Step 1 of the "Prepare configurations" section to a local path. Then, replace xxxx in the sample code with the local path. Example: /home/ssl/only.4096.client.truststore.jks.

    Important

    If you use a default endpoint or a SASL endpoint, this parameter is not required. If you use an SSL endpoint, this parameter is required.

    ssl.truststore.password

    The password for the server certificate. The value is fixed at KafkaOnsClient. Do not modify this parameter.

    ssl.endpoint.identification.algorithm

    The hostname verification. Set this to an empty value to disable it. Do not modify this parameter.

    java.security.auth.login.config

    The path of the JAAS configuration file. Save the kafka_client_jaas.conf file from the demo project to a local path. Then, replace xxxx in the sample code with the local path. Example: /home/ssl/kafka_client_jaas.conf.

    Important

    If you use a default endpoint, this parameter is not required. If you use an SSL endpoint or a SASL endpoint, this parameter is required.

Send messages

Compile and run KafkaProducerDemo.java to send messages.

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// If you use an SSL or SASL endpoint, comment out the following line.
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
* If you use an SSL or SASL endpoint, uncomment the following two lines.
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaProducerDemo {

    public static void main(String args[]) {
          
       /*
        * If you use an SSL endpoint, uncomment the following line.
        * Set the path of the JAAS configuration file.
        JavaKafkaConfigurer.configureSasl();
        */
         
       /*
        * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following line.
        * Set the path of the JAAS configuration file.
        JavaKafkaConfigurer.configureSaslPlain();
        */
       
       /*
        * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following line.
        * Set the path of the JAAS configuration file.
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Load kafka.properties.
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Set the endpoint. Obtain the endpoint for the topic from the console.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
         
       /*
        * If you use an SSL endpoint, uncomment the following four lines.
        * Similar to the SASL path, this file cannot be packaged into the JAR file.
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        * The password for the root certificate store. Keep the value unchanged.
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        * The access protocol. SASL_SSL is supported.
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        * The SASL authentication method. Keep the value unchanged.
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following two lines.
        * Access protocol.
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * PLAIN mechanism.
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        */

       /*
        * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following two lines.
        * Access protocol.
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        * SCRAM mechanism.
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        */

        // The serialization method for ApsaraMQ for Kafka messages.
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // The maximum wait time for a request.
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // The number of retries on the client.
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // The retry interval on the client.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
         
       /*
        * If you use an SSL endpoint, uncomment the following line.
        * Set hostname verification to empty.
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        */

        // Construct a producer object. This object is thread-safe. Typically, one producer object is sufficient for a process.
        // To improve performance, you can create more producer objects, but do not create too many. The number of producer objects should not exceed five.
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // Construct an ApsaraMQ for Kafka message.
        String topic = kafkaProperties.getProperty("topic"); // The topic to which the message belongs. After you create the topic in the console, enter the topic name here.
        String value = "this is the message's value"; // The content of the message.

        try {
            // Obtaining future objects in batches can improve performance. Do not use excessively large batches.
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 100; i++) {
                // Send the message and obtain a future object.
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);

            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                // Synchronously obtain the result of the future object.
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // If the message fails to be sent after several retries on the client, your application must handle the error.
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
}

Subscribe to messages

Choose one of the following ways to subscribe to messages.

Use a single consumer to subscribe to messages

You can send messages by compiling and running KafkaConsumerDemo.java.

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* If you use an SSL endpoint, uncomment the following three lines. If you use a SASL endpoint, uncomment the first two lines.
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/

public class KafkaConsumerDemo {

    public static void main(String args[]) {

        // Set the path of the JAAS configuration file.
        /*
         * If you use an SSL endpoint, uncomment the following line.
        JavaKafkaConfigurer.configureSasl();
         */
                        
        /*
         * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following line.
        JavaKafkaConfigurer.configureSaslPlain();
         */
                        
        /*
        * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following line.
        JavaKafkaConfigurer.configureSaslScram();
        */

        // Load kafka.properties.
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Set the endpoint. Obtain the endpoint for the topic from the console.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        // If you use an SSL endpoint, comment out the following line.
        // Set this value based on your data pulling requirements and client version. The default value is 30s.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * If you use an SSL endpoint, uncomment the following six lines.
         * Set the path of the SSL root certificate. Remember to replace XXX with your path.
         * Similar to the SASL path, this file cannot be packaged into the JAR file.
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * The password for the root certificate store. Keep the value unchanged.
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * The access protocol. SASL_SSL is supported.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * The SASL authentication method. Keep the value unchanged.
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * The maximum interval allowed between two polls.
         * If a consumer does not send a heartbeat within this interval, the server considers the consumer dead. The server then removes the consumer from the group and triggers a rebalance. The default value is 30s.
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         * The amount of data to pull in a single request. This parameter has a significant impact when you access the service over the Internet.
         props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
         props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
         */

        // If you use a SASL endpoint with the PLAIN mechanism, comment out the following line.
       // Set this value based on your data pulling requirements and client version. The default value is 30s.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following three lines.
         * Access protocol.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * PLAIN mechanism.
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         * The maximum interval allowed between two polls.
         * If a consumer does not send a heartbeat within this interval, the server considers the consumer dead. The server then removes the consumer from the group and triggers a rebalance. The default value is 30s.
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // If you use a SASL endpoint with the SCRAM mechanism, comment out the following line.
       // Set this value based on your data pulling requirements and client version. The default value is 30s.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        /*
         * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following four lines.
         * Access protocol.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * SCRAM mechanism.
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         * The maximum interval allowed between two polls.
         * If a consumer does not send a heartbeat within this interval, the server considers the consumer dead. The server then removes the consumer from the group and triggers a rebalance. The default value is 30s.
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
         */

        // The maximum number of records to return in a single call to poll().
        // Do not set this value too high. If you poll too much data and cannot process it before the next poll, a rebalance is triggered, which may cause stuttering.
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // The deserialization method for messages.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // The consumer group to which this consumer instance belongs. Create a group in the console and enter its ID here.
        // Consumer instances in the same group consume messages in a load-balanced manner.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
        
        // If you use an SSL endpoint, uncomment the following line.
        // Set hostname verification to empty.
        //props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        // Construct a message object, which is a consumer instance.
        KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        // Set the topics to which the consumer group subscribes. You can subscribe to multiple topics.
        // If the GROUP_ID_CONFIG is the same for multiple consumers, we recommend that you set them to subscribe to the same topics.
        List<String> subscribedTopics =  new ArrayList<String>();
        
        // If you use an SSL endpoint, comment out the first five lines and uncomment the sixth line.
        // To subscribe to multiple topics, add them here.
        // Each topic must be created in the console first.
        String topicStr = kafkaProperties.getProperty("topic");
        String[] topics = topicStr.split(",");
        for (String topic: topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);

        // Consume messages in a loop.
        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // The data must be consumed before the next poll. The total time spent must not exceed SESSION_TIMEOUT_MS_CONFIG.
                // We recommend that you use a separate thread pool to consume messages and return the results asynchronously.
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
          
                e.printStackTrace();
            }
        }
    }
}

Use multiple consumers to subscribe to messages

Compile and run KafkaMultiConsumerDemo.java to subscribe to messages.

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
// If you use an SSL or SASL endpoint, uncomment the following line.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
/*
* If you use an SSL endpoint, uncomment the first three lines. If you use a SASL endpoint, uncomment the first two lines.
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
import org.apache.kafka.common.errors.WakeupException;

/**
 * This demo shows how to start multiple consumers in one process to consume a topic at the same time.
 * Note: The total number of consumers must not exceed the total number of partitions in the subscribed topics.
 */
public class KafkaMultiConsumerDemo {

    public static void main(String args[]) throws InterruptedException {
        
        // Set the path of the JAAS configuration file.
        /* 
         * If you use an SSL endpoint, uncomment the following line.
         JavaKafkaConfigurer.configureSasl();
         */
                            
        /* 
         * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following line.
         JavaKafkaConfigurer.configureSaslPlain(); 
         */
                            
        /* 
         * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following line.
         JavaKafkaConfigurer.configureSaslScram();
         */


        // Load kafka.properties.
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        // Set the endpoint. Obtain the endpoint for the topic from the console.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        
        /*
         * If you use an SSL endpoint, uncomment the following four lines.
         * Similar to the SASL path, this file cannot be packaged into the JAR file.
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
         * The password for the root certificate store. Keep the value unchanged.
         props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
         * The access protocol. SASL_SSL is supported.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
         * The SASL authentication method. Keep the value unchanged.
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */
        
        /*
         * If you use a SASL endpoint with the PLAIN mechanism, uncomment the following two lines.
         * Access protocol.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * PLAIN mechanism.
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         */

        /* 
         * If you use a SASL endpoint with the SCRAM mechanism, uncomment the following two lines.
         * Access protocol.
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         * SCRAM mechanism.
         props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
         */

        // The maximum interval allowed between two polls.
        // If a consumer does not send a heartbeat within this interval, the server considers the consumer dead. The server then removes the consumer from the group and triggers a rebalance. The default value is 30s.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        // The maximum number of records to return in a single call to poll().
        // Do not set this value too high. If you poll too much data and cannot process it before the next poll, a rebalance is triggered, which may cause stuttering.
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        // The deserialization method for messages.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // The consumer group to which this consumer instance belongs. Create a group in the console and enter its ID here.
        // Consumer instances in the same group consume messages in a load-balanced manner.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        /* 
         * If you use an SSL endpoint, uncomment the following line.
         * Construct a consumer object, which is a consumer instance.
         * Set hostname verification to empty.
         props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
         */

        int consumerNum = 2;
        Thread[] consumerThreads = new Thread[consumerNum];
        for (int i = 0; i < consumerNum; i++) {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            List<String> subscribedTopics = new ArrayList<String>();
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);

            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
            consumerThreads[i] = new Thread(kafkaConsumerRunner);
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].start();
        }

        for (int i = 0; i < consumerNum; i++) {
            consumerThreads[i].join();
        }
    }

    static class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    try {
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        // The data must be consumed before the next poll. The total time spent must not exceed SESSION_TIMEOUT_MS_CONFIG.
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                        }
                    } catch (Exception e) {
                        try {
                            Thread.sleep(1000);
                        } catch (Throwable ignore) {

                        }
                        e.printStackTrace();
                    }
                }
            } catch (WakeupException e) {
                // Ignore the exception if the consumer is closed.
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }
        // A shutdown hook that can be called by another thread.
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

FAQ

How do I configure the SASL_SSL certificate for ApsaraMQ for Kafka?

Download the SSL certificate from the link in Step 1 of the "Prepare configurations" section and save it to a local path. Then, set the ssl.truststore.location parameter in the kafka.properties file of the demo project to this local path.

Can I bind my own SSL certificate when I use the Java SDK to send and receive messages through an instance endpoint?

No, you cannot. You must use the SSL certificate provided by ApsaraMQ for Kafka when you use the Java SDK to access the instance endpoint to send and receive messages.

References