Konsumsi batch mengirimkan beberapa pesan ke thread konsumen dalam satu kali pengiriman, bukan satu per satu. Pendekatan ini mengurangi beban overhead remote procedure call (RPC) ke sistem downstream dan meningkatkan throughput pesan.
Cara kerja
Konsumen push menangani konsumsi batch dalam dua tahap:
Pull and cache — Thread pull mengambil pesan dari ApsaraMQ for RocketMQ menggunakan long polling dan menyimpannya di cache secara lokal.
Dispatch — Ketika jumlah pesan yang di-cache mencapai ambang batas ukuran batch atau ambang batas waktu tunggu (mana yang lebih dulu tercapai), konsumen push mengirimkan batch tersebut ke thread konsumen untuk diproses.

Kasus penggunaan
Konsumsi batch paling efektif ketika sistem downstream memperoleh manfaat dari operasi batch. Jika tujuan Anda hanya meningkatkan paralelisme, pertimbangkan alternatif yang lebih sederhana terlebih dahulu, seperti menambahkan instans konsumen atau menyesuaikan ukuran kolam thread.
Bulk indexing — Sistem pesanan upstream mempublikasikan pesan log yang diindeks oleh kluster Elasticsearch downstream. Setiap pesan memicu satu permintaan RPC (~10 ms). Memproses 10 pesan secara individual membutuhkan waktu 100 ms; dengan menggabungkannya menjadi satu panggilan bulk-index, total waktu berkurang menjadi ~10 ms.
Bulk database inserts — Sebuah aplikasi memasukkan catatan ke database satu per satu dalam frekuensi pembaruan tinggi, sehingga menciptakan beban berat. Dengan mengelompokkan 10 catatan per operasi insert dan melakukan flush setiap 5 detik, beban overhead koneksi dan amplifikasi tulis berkurang.
Batasan
Konsumsi batch hanya didukung melalui TCP. Gunakan edisi komersial SDK client TCP untuk Java versi 1.8.7.3.Final atau lebih baru. Untuk catatan rilis dan instruksi unduhan, lihat Release notes.
Ukuran batch maksimum: 1.024 pesan.
Waktu tunggu maksimum antar batch: 450 detik.
Parameter
Dua parameter mengatur kapan sebuah batch dikirimkan. Pengiriman terjadi ketika salah satu kondisi terpenuhi, mana yang lebih dulu.
| Parameter | Tipe | Default | Rentang valid | Deskripsi |
|---|---|---|---|---|
ConsumeMessageBatchMaxSize | String | 32 | 1–1.024 | Jumlah maksimum pesan per batch. Saat jumlah pesan yang di-cache mencapai nilai ini, SDK segera mengirimkan batch ke thread konsumen. |
BatchConsumeMaxAwaitDurationInSeconds | String | 0 | 0–450 | Waktu tunggu maksimum dalam detik. Saat interval ini berakhir, SDK mengirimkan semua pesan yang telah terkumpul, meskipun ambang batas ukuran batch belum tercapai. |
Kode contoh
Konfigurasikan konsumsi batch melalui Properties yang diteruskan ke ONSFactory.createBatchConsumer(). Callback BatchMessageListener menerima List<Message> yang berisi hingga ConsumeMessageBatchMaxSize pesan.
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
// Setel jumlah maksimum pesan per batch.
// Default: 32. Nilai valid: 1 hingga 1024.
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// Setel waktu tunggu maksimum antar batch, dalam detik.
// Default: 0. Nilai valid: 0 hingga 450.
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// Proses pesan dalam batch.
return Action.CommitMessage;
}
});
// Mulai BatchConsumer.
batchConsumer.start();
System.out.println("Consumer start success.") ;
// Tunggu selama periode tetap agar proses tidak keluar.
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}- Untuk kode sumber lengkap, lihat pustaka kode di GitHub.
- Untuk referensi parameter lengkap, lihat Methods and parameters.
Praktik terbaik
Sesuaikan ukuran batch dan waktu tunggu secara bersamaan
Pengiriman dipicu ketika salah satu dari ambang batas ukuran batch atau waktu tunggu tercapai. Atur kedua parameter agar sesuai dengan workload Anda:
Skenario throughput tinggi — Atur
ConsumeMessageBatchMaxSizeke nilai besar (misalnya, 128 atau 256) danBatchConsumeMaxAwaitDurationInSecondske interval pendek (misalnya, 1–5 detik). Ini memungkinkan pengiriman batch secara sering tanpa menunggu batch penuh.Skenario throughput rendah — Atur ukuran batch moderat (misalnya, 32) dengan waktu tunggu lebih lama (misalnya, 10–30 detik) untuk menghindari pengiriman batch yang sangat kecil.
Contoh: Dengan ConsumeMessageBatchMaxSize diatur ke 128 dan BatchConsumeMaxAwaitDurationInSeconds diatur ke 1, batch dikirimkan setelah 1 detik meskipun jumlah pesan yang terkumpul kurang dari 128. Dalam kasus ini, messages.size() pada callback mengembalikan nilai kurang dari 128.
Implementasikan idempotensi konsumsi
Untuk mencapai konsumsi batch yang andal, implementasikan idempotensi pesan pada client konsumen Anda guna memastikan bahwa sebuah pesan hanya diproses sekali. Untuk informasi selengkapnya, lihat Consumption idempotence.