ApsaraMQ for Kafka instances in a virtual private cloud (VPC) expose a default endpoint for plaintext communication. Use this endpoint to connect a Java client and send or receive messages without configuring SSL certificates or SASL credentials.
Only resources deployed in the same VPC as your ApsaraMQ for Kafka instance can reach the default endpoint.
For encrypted or authenticated access, use the SSL endpoint or the SASL endpoint instead.
Prerequisites
Before you begin, ensure that you have:
An ApsaraMQ for Kafka instance with a topic and a consumer group already created. See Step 3: Create resources
JDK 1.8 or later installed. See Java Downloads
Maven 2.5 or later installed. See Downloading Apache Maven
An IDE or text editor (this tutorial uses IntelliJ IDEA)
Set up the demo project
Download the aliware-kafka-demos project from GitHub and decompress the package.
Open the
kafka-java-demofolder in IntelliJ IDEA or your preferred IDE. The project includes helper classes such asJavaKafkaConfigurer, which loads connection properties fromkafka.properties.
Add Java dependencies
Verify that the following dependencies are present in pom.xml. If not, add them:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>Match the kafka-clients version to the major version of your ApsaraMQ for Kafka instance. Check the major version on the Instance Details page in the ApsaraMQ for Kafka console.
Configure connection properties
Open kafka.properties in the demo project and set the following parameters:
bootstrap.servers=<your-default-endpoint>
topic=<your-topic-name>
group.id=<your-group-id>| Parameter | Description | Where to find it |
|---|---|---|
bootstrap.servers | The default endpoint of your instance | Endpoint Information section on the Instance Details page |
topic | The name of the topic | Topics page |
group.id | The ID of the consumer group | Groups page |
All values are available in the ApsaraMQ for Kafka console.
group.id is required only for consumers. Producers do not need it.Send messages
Compile and run KafkaProducerDemo.java to connect to the default endpoint and send messages to the specified topic.
The following sample code sends 100 messages:
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerDemo {
public static void main(String[] args) {
// Load kafka.properties from the demo project.
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
// Default endpoint (bootstrap.servers from kafka.properties).
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.getProperty("bootstrap.servers"));
// Serialize both keys and values as strings.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Maximum wait time for a request: 30 seconds.
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// Retry up to 5 times on transient failures, with a 3-second backoff.
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
// Create a thread-safe producer.
// One producer per process is sufficient. For higher throughput, use up to five.
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = kafkaProperties.getProperty("topic");
try {
List<Future<RecordMetadata>> futures = new ArrayList<>(128);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, "message value: " + i);
futures.add(producer.send(record));
}
producer.flush();
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata metadata = future.get();
System.out.println("Produce ok: " + metadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
// Troubleshoot if messages still fail after retries.
System.out.println("Failed to send messages.");
e.printStackTrace();
}
}
}Verify producer output
After a successful run, the console prints one Produce ok line per message with the topic, partition, and offset:
Produce ok: <your-topic>-0@15Receive messages
The demo project includes consumer examples that subscribe to a topic through the default endpoint. Two subscription approaches are available:
| Approach | Description | When to use |
|---|---|---|
| Subscribe by topic | The consumer group protocol assigns partitions automatically | Most scenarios. Supports automatic rebalancing when consumers join or leave the group |
| Assign specific partitions | Manually assign one or more partitions to the consumer | When you need fine-grained control over which partitions a consumer reads |
The demo project provides consumer examples for both subscription approaches. See the source files in the kafka-java-demo folder for the complete consumer code.For consumer code examples in other languages, see SDK overview.
Troubleshooting
If errors occur during connection or message delivery, see What do I do if an error is reported on a client when I use ApsaraMQ for Kafka?