When a Kafka producer sends high volumes of messages, misconfigured retries, batching, or acknowledgment settings can cause data loss, excessive latency, or out-of-memory errors. This topic explains how to configure your ApsaraMQ for Kafka producer for reliable sends and optimal throughput. All examples use the Java client. The core concepts apply to other languages, but parameter names and implementation details may differ.
Send a message
Every producer interaction starts with producer.send(), which accepts a ProducerRecord containing the topic, partition, timestamp, key, and value.
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, // The message topic.
null, // The partition number. Set this to null to let the producer assign a partition.
System.currentTimeMillis(), // The timestamp.
String.valueOf(value.hashCode()), // The message key.
value // The message value.
));The send() method is asynchronous. To block until the result is available, call:
RecordMetadata metadata = metadataFuture.get(timeout, TimeUnit.MILLISECONDS);For complete SDK examples, see SDK overview.
Key and value
Messages in ApsaraMQ for Kafka 0.10.2 have two fields:
Key -- The message identifier. Set a unique key per message to trace its lifecycle through sending and consumption logs.
Value -- The message content.
For high-volume sends that do not require key-based routing, omit the key and rely on the sticky partitioning strategy. This improves batching efficiency.
ApsaraMQ for Kafka 0.11.0 and later support headers. To use headers, upgrade the server to version 2.2.0.
Thread safety
A producer instance is thread-safe and can send messages to any topic. Use a single producer per application.
Configure retries
In a distributed environment, sends can fail due to network issues. The failure may occur at two points: the message reached the broker but the acknowledgment (ACK) was lost, or the message never reached the broker at all.
ApsaraMQ for Kafka uses a virtual IP address (VIP) network architecture that automatically closes idle connections. Inactive clients often receive a connection reset by peer error. Retries handle these transient failures.
| Parameter | Description | Recommended value |
|---|---|---|
retries | Maximum number of retry attempts for a failed send. | Use the default (set by client version). |
retry.backoff.ms | Delay between retries, in milliseconds. | 1000 |
Acknowledgments
The acks parameter controls how many replicas must confirm a write before the broker responds. Choose a setting based on your durability and performance requirements.
| Setting | Behavior | Throughput | Durability |
|---|---|---|---|
acks=0 | No broker response required. | Highest | Lowest -- data loss likely on any failure |
acks=1 | Response after the primary node writes the data. | Medium | Medium -- data loss if the primary node fails before replication |
acks=all | Response after the primary node and all in-sync replica (ISR) nodes write the data. | Lowest | Highest -- data loss only if both primary and replica nodes fail simultaneously |
To improve sending performance, set acks=1.
Optimize batching
The producer groups messages destined for the same partition into batches before sending. Larger batches reduce network requests, lower CPU usage, and improve throughput and latency. Small batches cause request queuing on both the client and server.
Two parameters control batching:
| Parameter | Description | Default | Recommendation |
|---|---|---|---|
batch.size | Maximum batch size per partition, in bytes. A network request is triggered when the batch reaches this size. | 16384 (16 KB) | Keep the default of 16384. Setting this too low degrades performance and stability. |
linger.ms | Maximum time a message waits in the buffer before being sent, in milliseconds. When this time expires, the producer sends the batch regardless of batch.size. | 0 | Set between 100 and 1000. |
A batch is sent when either threshold is reached, whichever comes first. For a good balance of throughput and latency, set batch.size=16384 and linger.ms=1000.
Use a client of version 2.4 or later, which enables the sticky partitioning strategy by default to further reduce fragmented sends.
Sticky partitioning strategy
Only messages sent to the same partition are grouped into a batch, so the partitioning strategy directly affects batching efficiency.
Keyed messages
For messages with a key, the producer hashes the key and selects a partition based on the hash result. Messages with the same key always go to the same partition.
Keyless messages
Before Kafka 2.4, the default strategy for keyless messages was round-robin: each message went to the next partition in sequence. This scattered messages across all partitions, producing many small batches and increasing latency.
Kafka 2.4 introduced the sticky partitioning strategy (KIP-480) to solve this problem. Instead of rotating partitions per message, the producer sticks to one partition until the current batch is full, then randomly selects a new partition. Over time, messages are still evenly distributed across all partitions, but batches are much larger. This approach prevents message partition skew, reduces latency, and improves overall service performance.
Enable sticky partitioning
Client version 2.4 or later: Sticky partitioning is the default. No configuration needed.
Client version earlier than 2.4: Implement a custom partitioner and set it via
partitioner.class. The following example switches partitions at a configurable time interval:
public class MyStickyPartitioner implements Partitioner {
// Records the time of the last partition switch.
private long lastPartitionChangeTimeMillis = 0L;
// Records the current partition.
private int currentPartition = -1;
// The partition switch interval. Set the interval as needed.
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Get all partition information.
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// Check the current active partitions.
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// For a message with a key, select a partition based on the key's hash value.
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// If the time since the last switch exceeds the switch interval, switch to the next partition. Otherwise, use the current partition.
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}Prevent out-of-memory (OOM) errors
The producer caches messages in memory before sending batches. If the cache grows too large, an out-of-memory (OOM) error occurs.
| Parameter | Description | Default | Recommendation |
|---|---|---|---|
buffer.memory | Total memory available for the producer's send buffer, in bytes. If the buffer is too small, memory allocation can take a long time, which affects sending performance and may cause sending timeouts. | 33554432 (32 MB) | Set to at least batch.size x number of partitions x 2. The default of 32 MB is sufficient for a single producer. |
Running multiple producers in the same Java Virtual Machine (JVM) multiplies memory usage. Each producer allocates its own buffer.memory -- four producers with the default 32 MB each consume 128 MB of heap. In production, a single producer per application is usually sufficient. If you must run multiple producers, reduce buffer.memory for each or increase the JVM heap size accordingly.
Partition ordering
Within a single partition, messages are stored and consumed in the order they are sent.
By default, ApsaraMQ for Kafka does not guarantee strict ordering within a partition. During an upgrade or failover, a small number of messages may become disordered when they are rerouted to another partition. This trade-off improves availability.
To enforce strict ordering within a partition, select local storage when creating the topic.