This topic describes how to use the SDK for Java to connect to the default endpoint of ApsaraMQ for Kafka from a Java client and send and receive messages in a virtual private cloud (VPC).
Prerequisites
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
A compiler is installed.
In this example, IntelliJ IDEA Ultimate is used.
Install Java dependencies
Create a Java project in IntelliJ IDEA.
Add the following dependencies to the pom.xml file:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.6</version> </dependency>NoteWe recommend that your client version be consistent with the major version of your ApsaraMQ for Kafka instance. You can view the major version of your ApsaraMQ for Kafka instance on the Instance Details page in the ApsaraMQ for Kafka console.
Prepare configuration files
(Optional) Download the SSL root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.
Go to the aliware-kafka-demos page, click
to download the demo project to your on-premises machine, and then decompress the package of the demo project. In the decompressed demo project, find the kafka-java-demo folder and import the folder to IntelliJ IDEA.
(Optional) If you use the SSL endpoint or the SASL endpoint to access your ApsaraMQ for Kafka instance, you must modify the kafka_client_jaas.conf configuration file. For information about the differences among endpoints, see Comparison among endpoints.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };If your ApsaraMQ for Kafka instance is a VPC-connected instance, only resources deployed in the same VPC can access the instance. This ensures the security and privacy of data transmission. In scenarios that require high security, you can enable the access control list (ACL) feature. After you enable the feature, messages are transmitted in a secure channel only after the SASL identity authentication is passed. You can select the PLAIN or SCRAM mechanism for identity authentication based on your business requirements for security protection. For more information, see Enable the ACL feature.
If you ApsaraMQ for Kafka instance is an Internet- and VPC-connected instance, messages must be authenticated and encrypted when transmitted over the Internet. The PLAIN mechanism of SASL must be used together with SSL to ensure that messages are not transmitted in plaintext without being encrypted.
In this topic, the values of the username and password parameters are the SASL username and password of the instance.
If you enable Internet access but not ACL for the instance, you can obtain the username and password of the default user in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If you enable ACL for the instance, make sure that the SASL user that you use is of the PLAIN type and granted the required permissions on message sending and receiving. For more information, see Grant permissions to SASL users.
Modify the kafka.properties configuration file.
##==============================Common parameters============================== bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx topic=xxx group.id=xxx ##=======================Configure the following parameters to their actual values.======================== ## The SSL endpoint. ssl.truststore.location=/xxxx/only.4096.client.truststore.jks java.security.auth.login.config=/xxxx/kafka_client_jaas.conf ## The PLAIN mechanism of the SASL endpoint. java.security.auth.login.config.plain=/xxxx/kafka_client_jaas_plain.conf ## The SCRAM mechanism of the SASL endpoint. java.security.auth.login.config.scram=/xxxx/kafka_client_jaas_scram.confParameter
Description
bootstrap.servers
The endpoint information. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic
The name of the topic on the instance. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
group.id
The ID of the group on the instance. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
NoteIf the client runs producer.go to send messages, this parameter is optional. If the client runs consumer.go to subscribe to messages, this parameter is required.
ssl.truststore.location
The path to which the SSL root certificate is saved. You must save the SSL certificate file that you downloaded in the "Prepare configuration files" section to a local path and then replace xxxx in the sample code with the local path. Example: /home/ssl/only.4096.client.truststore.jks.
ImportantIf you use the default endpoint or the SASL endpoint to access the instance, this parameter is not required. If you use the SSL endpoint to access the instance, this parameter is required.
java.security.auth.login.config
The path to which the JAAS configuration file is saved. You must save the kafka_client_jaas.conf file in the demo project to a local path and then replace xxxx in the sample code with the local path. Example: /home/ssl/kafka_client_jaas.conf.
ImportantIf you use the default endpoint to access the instance, this parameter is not required. If you use the SSL endpoint or the SASL endpoint to access the instance, this parameter is required.
Send messages
The following sample code provides an example of how to compile and run KafkaProducerDemo.java to send messages:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
// If you use the SSL endpoint or the SASL endpoint to access the instance, comment out the first line of the following code:
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
* If you use the SSL endpoint or the SASL endpoint to access the instance, uncomment the following two lines of code:
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
*/
public class KafkaProducerDemo {
public static void main(String args[]) {
/*
* If you use the SSL endpoint to access the instance, uncomment the following line of code.
Specify the path of the JAAS configuration file.
JavaKafkaConfigurer.configureSasl();
*/
/*
* If you use the PLAIN mechanism of the SASL endpoint to access the instance, uncomment the following line of code.
Specify the path of the JAAS configuration file.
JavaKafkaConfigurer.configureSaslPlain();
*/
/*
* If you use the SCRAM mechanism of the SASL endpoint to access the instance, uncomment the following line of code.
Specify the path of the JAAS configuration file.
JavaKafkaConfigurer.configureSaslScram();
*/
// Load the kafka.properties file.
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// Specify the endpoint. You can obtain the endpoint used to access the topic on the Instance Details page in the ApsaraMQ for Kafka console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
/*
* If you use the SSL endpoint to access the instance, uncomment the following four lines of code.
* Do not compress the file into a JAR package.
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
* The password of the truststore in the root certificate. Use the default value.
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
* The access protocol. Set this parameter to SASL_SSL.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
* The SASL authentication method. Use the default value.
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* If you use the PLAIN mechanism of the SASL endpoint to access the instance, uncomment the following two lines of code.
* The access protocol.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* The PLAIN mechanism.
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
*/
/*
* If you use the SCRAM mechanism of the SASL endpoint to access the instance, uncomment the following two lines of code.
* The access protocol.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
* The SCRAM mechanism.
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
*/
// The method that is used to serialize messages in ApsaraMQ for Kafka.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// The maximum waiting time for a request.
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// The maximum number of retries for messages on the client.
props.put(ProducerConfig.RETRIES_CONFIG, 5);
// The interval between two consecutive retries for messages in the client.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
/*
* If you use the SSL endpoint to access the instance, uncomment the following line of code.
* Set the algorithm for hostname verification to an empty value.
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
*/
// Construct a thread-safe producer object. Construct one producer object for a process.
// To improve performance, you can construct multiple producer objects. We recommend that you construct up to five producer objects.
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// Construct an ApsaraMQ for Kafka message.
String topic = kafkaProperties.getProperty("topic"); // The topic to which the message belongs. Enter the topic that you created in the ApsaraMQ for Kafka console.
String value = "this is the message's value"; // The message content.
try {
// Obtain multiple future objects at the same time. This helps improve efficiency. However, do not obtain a large number of future objects at the same time.
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i =0; i < 100; i++) {
// Send the message and obtain a future object.
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future: futures) {
// Obtain the results of the future object in a synchronous manner.
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
// If the message still fails to be sent after the maximum number of retries is reached, troubleshoot the error.
System.out.println("error occurred");
e.printStackTrace();
}
}
}Subscribe to messages
You can subscribe to messages by using one of the following methods.