This article describes the best practices of Message Queue for Apache Kafka producers to help you reduce errors when you send messages. The best practices in this article are written based on a Java client. A Java client shares the basic concepts and ideas with other programming languages, but its implementation details may be different.

Message sending

Sample code for sending a message:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   // The topic of the message.
        null,   // The partition number. We recommend that you set this parameter to null, and then the producer automatically allocates a partition number.
        System.currentTimeMillis(),   // The timestamp.
        String.valueOf(value.hashCode()),   // The key of the message.
        value   // The value of the message.
));

For more information about the complete sample code, see SDK overview.

Key and Value fields

Message Queue for Apache Kafka version 0.10.2.2 has the following two message fields:

  • Key: the identifier of a message.
  • Value: the content of a message.

To facilitate tracing, set a unique key for each message. When you need to track the sending and consumption of a message, you can use a unique key to query the sending and consumption logs of the message.

If you want to send a large number of messages, we recommend that you implement the sticky partitioning strategy instead of setting a key. For more information about the sticky partitioning strategy, see the Sticky partitioning strategy section of this article.

Notice Message Queue for Apache Kafka version 0.11.0 or later supports headers. If you need to use headers, upgrade your broker to version 2.2.0.

Retry

In a distributed environment, a message may fail to be sent due to network issues. This may occur after a message is sent but ACK failure occurs, or a message fails to be sent.

Message Queue for Apache Kafka uses a virtual IP address (VIP) network architecture where connections are closed after they are idle for more than 30 seconds. Therefore, inactive producers or consumers may receive the "Connection reset by peer" error message. In this case, we recommend that you resend the message.

You can set the following retry parameters based on your business needs:
  • retries: the number of retries. We recommend that you set this parameter to 3.
  • retry.backoff.ms: the interval between retries. We recommend that you set this parameter to 1000.

Asynchronous transmission

Messages are sent in asynchronous mode. To obtain the sending result, you can call the metadataFuture.get(timeout, TimeUnit.MILLISECONDS) method.

Thread safety

Producers are thread-safe and they can send messages to all of the topics. In most cases, one application corresponds to one producer.

ACKs

ACKs have the following settings:

  • acks=0: No response is returned from the broker. In this mode, the performance is high, but the risk of data loss is also high.
  • acks=1: A response is returned when data is written to the leader. In this mode, the performance and the risk of data loss are moderate. Data loss may occur if the leader fails.

  • acks=all: A response is returned when data is written to the leader and synchronized to the followers. In this mode, the performance is low, but the risk of data loss is also low. Data loss occurs if the leader and the followers fail at the same time.

We recommend that you set acks=1 for regular services and set acks=all for key services.

Capability improvement for message sending

A Message Queue for Apache Kafka topic has multiple partitions. Before the Message Queue for Apache Kafka producer sends messages to the broker, the producer needs to select a partition of a topic to send messages to. To send multiple messages to the same partition, the producer packages relevant messages into a batch and sends the messages to the broker in batches. When the producer processes messages in batches, it incurs additional overheads. Small batches can result in a large number of requests that are generated by the producer. The requests queue on the producer and the broker and also lead to high CPU utilization. This prolongs the duration of message sending and increases the consumption latency. When the producer sends messages to the broker, a suitable size of each batch can reduce requests from the producer to the broker. This can also increase the throughput and lower the latency for message sending.

The Message Queue for Apache Kafka producer manages batches based on two parameters:
  • batch.size: the volume of cached messages that are sent to each partition. This parameter specifies the total number of bytes in all of the messages in a batch, rather than the number of messages. When the volume of cached messages reaches the specified upper limit, a network request is triggered. Then, the producer sends the messages to the broker in a batch.
  • linger.ms: the maximum storage duration for each message in the cache. If a message is stored longer than the specified time limit in the cache, the producer immediately sends the message to the broker without considering the setting of the batch.size parameter.

Therefore, the batch.size and linger.ms parameters work together to determine when the Message Queue for Apache Kafka producer sends messages in batches to the broker. You can set these two parameters based on your business needs.

Sticky partitioning strategy

Only messages to be sent to the same partition can be packaged into the same batch. The partitioning strategy of the Message Queue for Apache Kafka producer determines how to generate a batch. You can use the Partitioner class to select a suitable partition for the Message Queue for Apache Kafka producer based on your business needs. For messages that have a key, the default partitioning strategy of the Message Queue for Apache Kafka producer is to hash the key of each message, and then select a partition based on the hash result. Messages with the same key are sent to the same partition.

For messages that do not have a key, the default partitioning strategy of the Message Queue for Apache Kafka producer in versions earlier than 2.4 is to recycle all of the partitions of a topic, and then send messages to each partition by polling. However, this default partitioning strategy may cause higher latency because a large number of small batches may be generated. In view of the low efficiency of this default partitioning strategy for key-free messages, the sticky partitioning strategy is introduced in Message Queue for Apache Kafka version 2.4.

The sticky partitioning strategy can reduce the small batches, which are generated because the key-free messages scatter in different partitions. When a batch is full of messages, the producer randomly selects another partition and sends subsequent messages to this partition. In this strategy, messages are sent to the same partition in a short time, but messages can be evenly distributed in each partition when the producer works longer. This strategy can avoid partition skew of messages, and improve the overall performance with lower latency.

If you are using the Message Queue for Apache Kafka producer in version 2.4 or later, the producer uses the sticky partitioning strategy by default. If you are using the producer in a version earlier than 2.4, you can set the partitioner.class parameter to specify a partitioning strategy based on the principles of the sticky partitioning strategy.

To implement the sticky partitioning strategy, you can use the following Java sample code. The implementation logic of this code is to change partitions based on a specific interval.
public class MyStickyPartitioner implements Partitioner {

    // Record the time of the last partition change.
    private long lastPartitionChangeTimeMillis = 0L;
    // Record the current partition.
    private int currentPartition = -1;
    // The interval between partition changes. Set the interval based on your business needs.
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ? > configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // Query the information about all partitions.
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // Determine the available partitions.
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // For messages that have a key, select a partition based on the hash value of the key.
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // If the interval between partition changes is longer than the specified time, select another partition. If not, select the same partition.
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

Based on the design of batches in Message Queue for Apache Kafka , Message Queue for Apache Kafka caches messages and then sends them in batches. However, if excessive messages are cached, an out of memory (OOM) error may occur.

  • When the total size of all cached messages exceeds the cache size that is specified by the buffer.memory parameter, the producer sends these messages to the broker. In this case, the settings of the batch.size and linger.ms parameters are ignored.
  • The default cache size that is specified by the buffer.memory parameter is 32 MB, which is sufficient for a single producer.
    Notice If you enable multiple producers on the same Java virtual machine (JVM), an OOM error may occur because each producer may occupy 32 MB of the cache space.
  • In most cases, you do not need to enable multiple producers during production. To avoid OOM errors in special scenarios, you must set the buffer.memory parameter.

Partitionally ordered messages

In each partition, messages are stored in the order that they are sent, and therefore are ordered.

By default, to improve the availability, Message Queue for Apache Kafka does not ensure the absolute order of messages in a single partition. A small number of messages become out of order during upgrade or downtime due to failovers. Messages in a failed partition will be moved to other partitions.

For Professional Edition instances that are billed in subscription mode, if your business requires messages to be strictly ordered in a partition, select local storage when you create a topic.