全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Konsumsi batch

更新时间:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan fitur konsumsi batch yang memungkinkan Anda memproses pesan secara efisien atau mengurangi jumlah panggilan API oleh sumber daya hilir. Topik ini menjelaskan definisi, manfaat, skenario, batasan, serta contoh kode dari fitur konsumsi batch.

Apa itu konsumsi batch?

  • Definisi

    ApsaraMQ for RocketMQ menyediakan fitur konsumsi batch yang memungkinkan konsumen push mengirimkan pesan dalam batch ke thread konsumen untuk diproses secara bersamaan.

    Catatan ApsaraMQ for RocketMQ mendukung konsumen push dan pull berdasarkan mode pengambilan pesan (push atau pull). Untuk informasi lebih lanjut, lihat Istilah.
  • Cara Kerjanya
    Proses konsumsi batch terdiri dari dua tahap berikut:
    1. Produsen menerbitkan pesan ke ApsaraMQ for RocketMQ. Thread penarikan pesan dari konsumen push kemudian menarik pesan menggunakan kebijakan polling panjang dan menyimpannya di backend ApsaraMQ for RocketMQ.
    2. Konsumen push menentukan apakah akan mengirimkan pesan ke thread konsumen untuk konsumsi batch berdasarkan kondisi tertentu.
    Gambar berikut mengilustrasikan proses konsumsi batch. batch_consume

Batasan

  • Konsumsi batch hanya didukung melalui TCP. Pastikan Anda menggunakan edisi komersial dari TCP client SDK for Java versi 1.8.7.3.Final atau yang lebih baru. Untuk informasi tentang catatan rilis SDK dan cara mendapatkannya, lihat Catatan Rilis.
  • Maksimal 1.024 pesan dapat dikirim dalam satu batch, dengan waktu tunggu maksimum antar batch sebesar 450 detik.

Manfaat dan skenario

Berikut adalah manfaat dan skenario penggunaan fitur konsumsi batch:

  • Manfaat 1: Tingkatkan Throughput Pesan dan Efisiensi Pemrosesan

    Skenario: ApsaraMQ for RocketMQ memisahkan sistem pesanan hulu dari sistem Elasticsearch hilir. Sistem Elasticsearch mengonsumsi 10 pesan log dari sistem pesanan hulu. Setiap pesan setara dengan permintaan remote procedure call (RPC). Jika satu RPC membutuhkan waktu 10 milidetik, total waktu untuk mengonsumsi 10 pesan tanpa konsumsi batch adalah 100 milidetik. Dengan konsumsi batch, 10 pesan tersebut diproses dalam satu batch, sehingga waktu konsumsi berkurang menjadi 10 milidetik. Hal ini meningkatkan efisiensi pemrosesan pesan.

  • Manfaat 2: Kurangi Jumlah Panggilan API oleh Sumber Daya Hilir

    Skenario: Misalkan Anda ingin memasukkan data ke dalam database. Jika Anda melakukan operasi insert untuk setiap data baru dan sering memperbarui data, database mungkin mengalami tekanan tinggi. Anda dapat mengatur parameter untuk menyisipkan 10 data dalam satu batch dan melakukan operasi insert setiap 5 detik, sehingga mengurangi beban sistem.

Contoh kode

Berikut adalah contoh implementasi konsumsi batch:

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        // AccessKey ID yang digunakan untuk otentikasi.
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // Rahasia AccessKey yang digunakan untuk otentikasi.
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // Jumlah maksimum pesan yang dikonsumsi pada satu waktu. Dalam contoh ini, nilainya diatur menjadi 128. Jika jumlah pesan yang disimpan dalam topik yang ditentukan mencapai nilai ini, SDK segera memanggil metode callback untuk konsumen agar mengonsumsi pesan. Nilai valid: 1 hingga 1024. Nilai default: 32.
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // Waktu tunggu maksimum antara dua batch berturut-turut. Dalam contoh ini, nilainya ditentukan sebagai 10 detik. Jika waktu tunggu yang ditentukan tercapai, SDK segera memanggil metode callback untuk konsumen agar mengonsumsi pesan. Nilai valid: 0 hingga 450. Nilai default: 0. Satuan: detik.
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Ukuran-Batch: %d\n", messages.size());
                // Proses pesan dalam batch.
                return Action.CommitMessage;
            }
        });
        // Mulai konsumen untuk konsumsi batch.
        batchConsumer.start();
        System.out.println("Konsumen berhasil dimulai.");

        // Tunggu selama periode waktu tertentu untuk mencegah proses keluar.
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}         

Tabel berikut menjelaskan parameter yang digunakan.

ParameterTipeDiperlukanDeskripsi
ConsumeMessageBatchMaxSizeStringTidak
Catatan Jika tidak ada nilai parameter yang ditentukan, nilai default akan digunakan.
Jumlah maksimum pesan yang dikonsumsi dalam satu batch. Jika jumlah pesan yang disimpan mencapai nilai yang ditentukan dari parameter ini, SDK dari klien konsumen push mengirimkan pesan ke thread konsumen sekaligus untuk konsumsi batch. Nilai valid: 1 hingga 1024. Nilai default: 32.
BatchConsumeMaxAwaitDurationInSecondsStringWaktu tunggu maksimum antar batch. Jika waktu tunggu yang ditentukan oleh parameter ini tercapai, ApsaraMQ for RocketMQ mendorong pesan ke konsumen dalam satu batch. Nilai valid: 1 hingga 450. Nilai default: 0. Satuan: detik.
Catatan

Praktik terbaik

Atur nilai parameter ConsumeMessageBatchMaxSize dan BatchConsumeMaxAwaitDurationInSeconds sesuai kebutuhan Anda. Konsumsi batch dipicu jika salah satu kondisi pemicu terpenuhi. Misalnya, jika parameter ConsumeMessageBatchMaxSize diatur ke 128 dan parameter BatchConsumeMaxAwaitDurationInSeconds diatur ke 1, konsumsi batch tetap dipicu meskipun kurang dari 128 pesan disimpan dalam waktu 1 detik. Dalam hal ini, nilai kurang dari 128 akan dikembalikan untuk parameter Ukuran-Batch.

Untuk hasil terbaik, kami sarankan Anda menerapkan idempotensi pesan pada klien konsumen untuk memastikan bahwa sebuah pesan diproses hanya sekali. Untuk informasi lebih lanjut tentang idempotensi pesan, lihat Idempotensi Pesan.

Referensi

Edisi Komersial dari TCP Client SDK for Java