All Products
Search
Document Center

ApsaraMQ for Kafka:Best practices for producers

Last Updated:Sep 02, 2025

This topic describes the best practices for ApsaraMQ for Kafka producers to help you reduce message sending errors. These best practices are based on the Java client. The basic concepts are similar for clients in other programming languages, but the implementation details may differ.

Send a message

The following sample code shows how to send a message:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   // The message topic.
        null,   // The partition number. Set this to null to let the producer assign a partition.
        System.currentTimeMillis(),   // The timestamp.
        String.valueOf(value.hashCode()),   // The message key.
        value   // The message value.
));

For the complete sample code, see SDK Overview.

Key and Value

Messages in ApsaraMQ for Kafka 0.10.2 have the following two fields:

  • Key: The identifier of a message.

  • Value: The content of a message.

To simplify tracking, set a unique key for each message. You can use the key to track a message and print sending and consumption logs to check its status.

If you send many messages, do not set a key. Instead, use the sticky partitioning strategy. For more information about the sticky partitioning strategy, see Sticky partitioning strategy.

Important

ApsaraMQ for Kafka 0.11.0 and later versions support headers. If you want to use headers, you must upgrade the server to version 2.2.0.

Retry failed attempts

In a distributed environment, message sending can occasionally fail because of network issues. A failure can occur because the message was sent but the acknowledgment (ACK) failed, or because the message was not sent successfully.

ApsaraMQ for Kafka uses a virtual IP address (VIP) network architecture. In this architecture, connections that are idle for a long time are automatically closed. Therefore, inactive clients often receive a connection reset by peer error. If this error occurs, you must retry sending the message.

You can set the following retry parameters as needed:

  • retries: The number of retries if a message fails to be sent.

  • retry.backoff.ms: The retry interval for a failed message. We recommend that you set this parameter to 1000. The unit is milliseconds.

Send messages asynchronously

The send interface is asynchronous. To receive the sending result, you can call metadataFuture.get(timeout, TimeUnit.MILLISECONDS).

Thread safety

A producer is thread-safe and can send messages to any topic. Typically, one application uses one producer.

Acks

The following list describes the `acks` settings:

  • acks=0: No response is required from the server. This setting provides high performance but has a high risk of data loss.

  • acks=1: A response is returned after the data is written to the primary node. This setting provides medium performance and a medium risk of data loss. Data loss can occur if the primary node goes down.

  • acks=all: A response is returned only after the data is written to the primary node and synchronized to the replica nodes. This setting provides low performance but high data security. Data is lost only if both the primary and replica nodes go down.

To improve sending performance, you can set acks=1.

Improve sending performance by reducing fragmented requests

Typically, an ApsaraMQ for Kafka topic has multiple partitions. When an ApsaraMQ for Kafka producer client sends a message to the server, it must first determine which partition of the topic to send the message to. When you send multiple messages to the same partition, the producer client packages the messages into a batch and sends them to the server. The producer client incurs extra overhead when it processes a batch. Small batches can cause the producer client to generate many requests. This leads to request queuing on the client and server, increases CPU usage, and increases the overall message sending and consumption latency. A suitable batch size can reduce the number of requests from the client to the server, which improves the overall throughput and latency of message sending.

The ApsaraMQ for Kafka producer controls the batching mechanism with two main parameters:

  • batch.size: The size of the message cache for each partition. This is the total number of bytes of the message content, not the number of messages. When the cache reaches this size, a network request is triggered to send the batch to the server. If batch.size is too small, sending performance and stability can be affected. We recommend that you keep the default value of 16384. The unit is bytes.

  • linger.ms: The maximum time a message can stay in the cache. If this time is exceeded, the producer client ignores the batch.size limit and immediately sends the message to the server. You can set linger.ms to a value between 100 and 1000, as needed. The unit is milliseconds.

Therefore, the timing of when the ApsaraMQ for Kafka producer sends a batch of messages to the server is determined by both batch.size and linger.ms. You can adjust these parameters as needed. To improve sending performance and ensure service stability, we recommend that you set batch.size=16384 and linger.ms=1000.

In addition, we recommend that you use a client of version 2.4 or later to send messages. This version enables the sticky partitioning strategy by default. This strategy significantly reduces fragmented sending and improves overall sending performance. For the Kafka Improvement Proposal (KIP) and performance report about the sticky partitioning strategy, see KIP-480: Sticky Partitioner - Apache Kafka - Apache Software Foundation.

Sticky partitioning strategy

Only messages sent to the same partition are placed in the same batch. Therefore, the partitioning strategy that is set on the ApsaraMQ for Kafka producer is a factor that determines how a batch is formed. The ApsaraMQ for Kafka producer lets you select a suitable partition for your business by setting the partitioner implementation class. For messages with a specified key, the default strategy of the ApsaraMQ for Kafka producer is to hash the message key and then select a partition based on the hash result. This ensures that messages with the same key are sent to the same partition.

For messages without a specified key, the default strategy in ApsaraMQ for Kafka versions earlier than 2.4 is to cycle through all partitions of the topic. Messages are sent to each partition in a round-robin manner. However, this default strategy results in poor batching performance. In practice, it can generate many small batches, which increases latency. Because of the low partitioning efficiency for messages without a key, ApsaraMQ for Kafka introduced the sticky partitioning strategy in version 2.4.

The sticky partitioning strategy solves the problem of small batches that are caused by messages without a key being scattered across different partitions. The main strategy is to randomly select another partition after a partition's batch is complete. Subsequent messages then use that new partition as much as possible. In the short term, this strategy sends messages to the same partition. Over a longer runtime, messages are still evenly distributed across all partitions. This approach prevents message partition skew, reduces latency, and improves overall service performance.

If you use an ApsaraMQ for Kafka producer client of version 2.4 or later, the sticky partitioning strategy is used by default. If you use a producer client earlier than version 2.4, you can implement your own partitioning strategy based on the principles of the sticky partitioning strategy. Then, you can set the desired strategy using the partitioner.class parameter.

For an implementation of the sticky partitioning strategy, see the following Java code. The logic of this code is to switch partitions at a specific interval.

public class MyStickyPartitioner implements Partitioner {

    // Records the time of the last partition switch.
    private long lastPartitionChangeTimeMillis = 0L;
    // Records the current partition.
    private int currentPartition = -1;
    // The partition switch interval. Set the interval as needed.
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // Get all partition information.
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // Check the current active partitions.
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // For a message with a key, select a partition based on the key's hash value.
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // If the time since the last switch exceeds the switch interval, switch to the next partition. Otherwise, use the current partition.
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

Based on the batch design of ApsaraMQ for Kafka, ApsaraMQ for Kafka caches messages and sends them in batches. If too many messages are cached, an out-of-memory (OOM) error can occur.

  • buffer.memory: The size of the memory pool for sending messages. If the memory pool is too small, requesting memory can take a long time. This can affect sending performance and even cause sending timeouts. We recommend that you set buffer.memory to be greater than or equal to `batch.size × number of partitions × 2`. The unit is bytes.

  • The default value of buffer.memory is 32 MB. This is sufficient for a single producer to ensure adequate performance.

    Important

    If you start multiple producers in the same Java Virtual Machine (JVM), each producer might occupy 32 MB of cache space. This can trigger an OOM error.

  • In a production environment, it is usually not necessary to start multiple producers. If special circumstances require it, you must consider the size of buffer.memory to avoid OOM errors.

Partition order

Within a single partition, messages are stored and consumed in the order they are sent. Therefore, messages are mostly ordered.

By default, to improve availability, ApsaraMQ for Kafka does not guarantee strict order within a single partition. During an upgrade or a breakdown, a small number of messages can become disordered. This happens when a partition fails and its messages are failed over to another partition.

If your business requires strict ordering within a partition, you can select local storage when you create the topic.