全部产品
Search
文档中心

ApsaraMQ for Kafka:Praktik terbaik untuk produsen

更新时间:Nov 11, 2025

Topik ini menjelaskan praktik terbaik untuk produsen ApsaraMQ for Kafka guna membantu Anda mengurangi kesalahan pengiriman pesan. Praktik terbaik ini didasarkan pada klien Java. Konsep dasarnya serupa untuk klien dalam bahasa pemrograman lain, meskipun detail implementasinya mungkin berbeda.

Kirim pesan

Kode contoh berikut menunjukkan cara mengirim pesan:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   // Topik pesan.
        null,   // Nomor partisi. Atur ke null agar produsen menetapkan partisi.
        System.currentTimeMillis(),   // Stempel waktu.
        String.valueOf(value.hashCode()),   // Kunci pesan.
        value   // Nilai pesan.
));

Untuk kode contoh lengkapnya, lihat Ikhtisar SDK.

Kunci dan Nilai

Pesan dalam ApsaraMQ for Kafka 0.10.2 memiliki dua bidang berikut:

  • Kunci: Pengidentifikasi sebuah pesan.

  • Nilai: Konten sebuah pesan.

Untuk menyederhanakan pelacakan, tetapkan kunci unik untuk setiap pesan. Anda dapat menggunakan kunci tersebut untuk melacak pesan serta mencatat log pengiriman dan konsumsi guna memeriksa statusnya.

Jika Anda mengirim banyak pesan, jangan tetapkan kunci. Sebagai gantinya, gunakan strategi partisi sticky. Untuk informasi selengkapnya tentang strategi partisi sticky, lihat Strategi partisi sticky.

Penting

ApsaraMQ for Kafka versi 0.11.0 dan yang lebih baru mendukung header. Jika ingin menggunakan header, Anda harus meningkatkan server ke versi 2.2.0.

Ulangi upaya yang gagal

Dalam lingkungan terdistribusi, pengiriman pesan kadang-kadang gagal karena masalah jaringan. Kegagalan dapat terjadi baik karena pesan telah dikirim tetapi acknowledgment (ACK) gagal diterima, maupun karena pesan tidak berhasil dikirim.

ApsaraMQ for Kafka menggunakan arsitektur jaringan alamat IP virtual (VIP). Dalam arsitektur ini, koneksi yang tidak aktif dalam waktu lama akan ditutup secara otomatis. Oleh karena itu, klien yang tidak aktif sering menerima kesalahan connection reset by peer. Jika kesalahan ini terjadi, Anda harus mengulangi pengiriman pesan.

Anda dapat mengatur parameter pengulangan berikut sesuai kebutuhan:

  • retries: Jumlah pengulangan jika pengiriman pesan gagal.

  • retry.backoff.ms: Interval pengulangan untuk pesan yang gagal. Kami menyarankan Anda mengatur parameter ini ke 1000. Satuannya adalah milidetik.

Kirim pesan secara asinkron

Antarmuka pengiriman bersifat asinkron. Untuk menerima hasil pengiriman, Anda dapat memanggil metadataFuture.get(timeout, TimeUnit.MILLISECONDS).

Keamanan thread

Sebuah produsen bersifat thread-safe dan dapat mengirim pesan ke topik apa pun. Biasanya, satu aplikasi menggunakan satu produsen.

Acks

Daftar berikut menjelaskan pengaturan `acks`:

  • acks=0: Tidak diperlukan tanggapan dari server. Pengaturan ini memberikan kinerja tinggi tetapi memiliki risiko tinggi terhadap kehilangan data.

  • acks=1: Tanggapan dikembalikan setelah data ditulis ke node primer. Pengaturan ini memberikan kinerja menengah dan risiko kehilangan data menengah. Kehilangan data dapat terjadi jika node primer mati.

  • acks=all: Tanggapan hanya dikembalikan setelah data ditulis ke node primer dan disinkronkan ke node replika. Pengaturan ini memberikan kinerja rendah tetapi keamanan data tinggi. Data hanya hilang jika node primer dan replika sama-sama mati.

Untuk meningkatkan kinerja pengiriman, Anda dapat mengatur acks=1.

Tingkatkan kinerja pengiriman dengan mengurangi permintaan terfragmentasi

Umumnya, sebuah topik ApsaraMQ for Kafka memiliki beberapa partisi. Saat klien produsen ApsaraMQ for Kafka mengirim pesan ke server, klien tersebut harus terlebih dahulu menentukan ke partisi mana pesan tersebut akan dikirim. Ketika Anda mengirim beberapa pesan ke partisi yang sama, klien produsen mengemas pesan-pesan tersebut menjadi satu batch dan mengirimkannya ke server. Klien produsen mengalami overhead tambahan saat memproses batch. Batch kecil dapat menyebabkan klien produsen menghasilkan banyak permintaan. Hal ini menyebabkan antrian permintaan di sisi klien dan server, meningkatkan penggunaan CPU, serta meningkatkan latensi keseluruhan pengiriman dan konsumsi pesan. Ukuran batch yang sesuai dapat mengurangi jumlah permintaan dari klien ke server, sehingga meningkatkan throughput dan mengurangi latensi keseluruhan pengiriman pesan.

Produsen ApsaraMQ for Kafka mengontrol mekanisme batching dengan dua parameter utama:

  • batch.size: Ukuran cache pesan untuk setiap partisi. Ini adalah jumlah total byte dari konten pesan, bukan jumlah pesan. Saat cache mencapai ukuran ini, permintaan jaringan dipicu untuk mengirim batch ke server. Jika batch.size terlalu kecil, kinerja dan stabilitas pengiriman dapat terpengaruh. Kami menyarankan Anda mempertahankan nilai default 16384. Satuannya adalah byte.

  • linger.ms: Waktu maksimum sebuah pesan dapat tetap berada di cache. Jika waktu ini terlampaui, klien produsen mengabaikan batas batch.size dan segera mengirim pesan ke server. Anda dapat mengatur linger.ms ke nilai antara 100 hingga 1000, sesuai kebutuhan. Satuannya adalah milidetik.

Oleh karena itu, waktu pengiriman batch pesan oleh produsen ApsaraMQ for Kafka ke server ditentukan oleh kedua parameter batch.size dan linger.ms. Anda dapat menyesuaikan parameter-parameter ini sesuai kebutuhan. Untuk meningkatkan kinerja pengiriman dan memastikan stabilitas layanan, kami menyarankan Anda mengatur batch.size=16384 dan linger.ms=1000.

Selain itu, kami menyarankan Anda menggunakan klien versi 2.4 atau yang lebih baru untuk mengirim pesan. Versi ini mengaktifkan strategi partisi sticky secara default. Strategi ini secara signifikan mengurangi pengiriman terfragmentasi dan meningkatkan kinerja pengiriman keseluruhan. Untuk Proposal Perbaikan Kafka (KIP) dan laporan kinerja mengenai strategi partisi sticky, lihat KIP-480: Sticky Partitioner - Apache Kafka - Apache Software Foundation.

Strategi partisi sticky

Hanya pesan yang dikirim ke partisi yang sama yang ditempatkan dalam batch yang sama. Oleh karena itu, strategi partisi yang ditetapkan pada produsen ApsaraMQ for Kafka merupakan faktor yang menentukan bagaimana sebuah batch dibentuk. Produsen ApsaraMQ for Kafka memungkinkan Anda memilih partisi yang sesuai untuk bisnis Anda dengan mengatur kelas implementasi partitioner. Untuk pesan dengan kunci yang ditentukan, strategi default produsen ApsaraMQ for Kafka adalah melakukan hash terhadap kunci pesan, lalu memilih partisi berdasarkan hasil hash tersebut. Hal ini memastikan bahwa pesan dengan kunci yang sama dikirim ke partisi yang sama.

Untuk pesan tanpa kunci yang ditentukan, strategi default pada versi ApsaraMQ for Kafka sebelum 2.4 adalah melakukan rotasi melalui semua partisi topik. Pesan dikirim ke setiap partisi secara bergiliran. Namun, strategi default ini menghasilkan kinerja batching yang buruk. Dalam praktiknya, strategi ini dapat menghasilkan banyak batch kecil, yang meningkatkan latensi. Karena efisiensi partisi yang rendah untuk pesan tanpa kunci, ApsaraMQ for Kafka memperkenalkan strategi partisi sticky pada versi 2.4.

Strategi partisi sticky menyelesaikan masalah batch kecil yang disebabkan oleh pesan tanpa kunci tersebar di berbagai partisi. Strategi utamanya adalah memilih partisi lain secara acak setelah batch suatu partisi selesai. Pesan berikutnya kemudian menggunakan partisi baru tersebut sebanyak mungkin. Dalam jangka pendek, strategi ini mengirim pesan ke partisi yang sama. Dalam jangka panjang, pesan tetap didistribusikan secara merata ke semua partisi. Pendekatan ini mencegah ketimpangan partisi pesan, mengurangi latensi, dan meningkatkan kinerja layanan secara keseluruhan.

Jika Anda menggunakan klien produsen ApsaraMQ for Kafka versi 2.4 atau yang lebih baru, strategi partisi sticky digunakan secara default. Jika Anda menggunakan klien produsen versi sebelum 2.4, Anda dapat mengimplementasikan strategi partisi sendiri berdasarkan prinsip strategi partisi sticky. Kemudian, Anda dapat mengatur strategi yang diinginkan menggunakan parameter partitioner.class.

Untuk implementasi strategi partisi sticky, lihat kode Java berikut. Logika kode ini adalah mengganti partisi pada interval tertentu.

public class MyStickyPartitioner implements Partitioner {

    // Mencatat waktu pergantian partisi terakhir.
    private long lastPartitionChangeTimeMillis = 0L;
    // Mencatat partisi saat ini.
    private int currentPartition = -1;
    // Interval pergantian partisi. Atur interval sesuai kebutuhan.
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ?> configs) {}

    /**
     * Menghitung partisi untuk catatan yang diberikan.
     *
     * @param topic Nama topik
     * @param key Kunci untuk partisi (atau null jika tidak ada kunci)
     * @param keyBytes kunci serialisasi untuk partisi (atau null jika tidak ada kunci)
     * @param value Nilai untuk partisi atau null
     * @param valueBytes nilai serialisasi untuk partisi atau null
     * @param cluster Metadata kluster saat ini
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // Mendapatkan semua informasi partisi.
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // Memeriksa partisi aktif saat ini.
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // Untuk pesan dengan kunci, pilih partisi berdasarkan nilai hash kunci.
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // Jika waktu sejak pergantian terakhir melebihi interval pergantian, beralih ke partisi berikutnya. Jika tidak, gunakan partisi saat ini.
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

Berdasarkan desain batch ApsaraMQ for Kafka, ApsaraMQ for Kafka menyimpan pesan dalam cache dan mengirimkannya secara batch. Jika terlalu banyak pesan di-cache, kesalahan kehabisan memori (OOM) dapat terjadi.

  • buffer.memory: Ukuran pool memori untuk mengirim pesan. Jika pool memori terlalu kecil, permintaan memori dapat memakan waktu lama. Hal ini dapat memengaruhi kinerja pengiriman dan bahkan menyebabkan timeout pengiriman. Kami menyarankan Anda mengatur buffer.memory agar lebih besar dari atau sama dengan `batch.size × jumlah partisi × 2`. Satuannya adalah byte.

  • Nilai default buffer.memory adalah 32 MB. Nilai ini cukup untuk satu produsen guna memastikan kinerja yang memadai.

    Penting

    Jika Anda menjalankan beberapa produsen dalam Java Virtual Machine (JVM) yang sama, setiap produsen dapat menempati ruang cache sebesar 32 MB. Hal ini dapat memicu kesalahan OOM.

  • Dalam lingkungan produksi, biasanya tidak perlu menjalankan beberapa produsen. Jika keadaan khusus mengharuskannya, Anda harus mempertimbangkan ukuran buffer.memory untuk menghindari kesalahan OOM.

Urutan partisi

Dalam satu partisi tunggal, pesan disimpan dan dikonsumsi sesuai urutan pengirimannya. Oleh karena itu, pesan sebagian besar terurut.

Secara default, untuk meningkatkan ketersediaan, ApsaraMQ for Kafka tidak menjamin urutan ketat dalam satu partisi. Selama peningkatan atau gangguan, sejumlah kecil pesan dapat menjadi tidak terurut. Hal ini terjadi ketika suatu partisi gagal dan pesannya dialihkan ke partisi lain.

Jika bisnis Anda memerlukan pengurutan ketat dalam satu partisi, Anda dapat memilih penyimpanan lokal saat membuat topik.