全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Metode dan parameter

更新时间:Jul 02, 2025

SDK ApsaraMQ for RocketMQ untuk Java digunakan untuk mengirim dan berlangganan pesan. Dalam ApsaraMQ for RocketMQ, pengguna dapat memperoleh pesan dalam mode push atau pull. Bagian ini menjelaskan metode dan parameter yang digunakan untuk mengirim dan berlangganan pesan.

Informasi latar belakang

ApsaraMQ for RocketMQ mendukung mode berikut untuk memperoleh pesan:

  • Push: ApsaraMQ for RocketMQ mendorong pesan ke konsumen. Dalam mode push, ApsaraMQ for RocketMQ dapat mendorong beberapa pesan ke konsumen sekaligus. Untuk informasi lebih lanjut, lihat Konsumsi Batch.

  • Pull: Konsumen menarik pesan dari ApsaraMQ for RocketMQ.

Mode pull memberikan lebih banyak opsi untuk menerima pesan serta kontrol yang lebih besar atas penarikan pesan dibandingkan dengan mode push.

Penting

Untuk menggunakan mode pull, pastikan bahwa instance ApsaraMQ for RocketMQ Anda adalah Enterprise Platinum Edition.

Parameter umum

Parameter

Deskripsi

NAMESRV_ADDR

TCP endpoint. Anda dapat mengklik Instance Details di konsol ApsaraMQ for RocketMQ untuk memperoleh TCP endpoint.

AccessKey

ID AccessKey digunakan sebagai pengenal unik untuk otentikasi. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey.

SecretKey

Rahasia AccessKey digunakan sebagai kata sandi untuk otentikasi. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey.

OnsChannel

Saluran pengguna. Nilai default: ALIYUN. Jika Anda adalah pengguna CloudTmall, atur nilainya menjadi CLOUD.

Metode untuk mengirim pesan

messagesendinterface

Parameter untuk mengirim pesan

Parameter

Deskripsi

SendMsgTimeoutMillis

Jangka waktu timeout untuk mengirim pesan.

Satuan: milidetik.

CheckImmunityTimeInSeconds

Jangka waktu terpendek yang harus ditunggu sistem sebelum memeriksa status pesan transaksional untuk pertama kalinya. Satuan: detik.

shardingKey

Kunci partisi yang digunakan untuk menentukan partisi tempat pesan terurut didistribusikan.

Metode untuk berlangganan pesan

Gambar 1. Metode dalam Mode Pushconsumeinterface

Gambar 2. Metode dalam Mode Pullpull_consumer

Berikut adalah deskripsi metode yang dapat dipanggil dalam mode pull:

public interface PullConsumer extends Admin {

    /**
     * Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run. 
     */
    Set<TopicPartition> topicPartitions(String topic);

    /**
     * Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe. 
     */
    void assign(Collection<TopicPartition> topicPartitions);

    /**
     * Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds. 
     */
    List<Message> poll(long timeout);

    /**
     * Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seek(TopicPartition topicPartition, long offset);

    /**
     * Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToBeginning(TopicPartition topicPartition);

    /**
     * Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition. 
     */
    void seekToEnd(TopicPartition topicPartition);

    /**
     * Suspend message consumption in a specified partition.
     */
    void pause(Collection<TopicPartition> topicPartitions);

    /**
     * Resume message consumption in a specified partition.
     */
    void resume(Collection<TopicPartition> topicPartitions);

    /**
     * Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp. 
     */
    Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);

    /**
     * Query the latest consumer offset of a specified partition.
     */
    Long committed(TopicPartition topicPartition);

    /**
     * Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment. 
     */
    void commitSync();
    
    interface TopicPartitionChangeListener {
        /**
         * This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling. 
         */
        void onChanged(Set<TopicPartition> topicPartitions);
    }
    
    /**
     * Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds. 
     */
    void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}

Parameter

Tabel 1. Parameter Umum

Parameter

Deskripsi

GROUP_ID

ID grup yang telah Anda buat di konsol ApsaraMQ for RocketMQ. Untuk informasi lebih lanjut, lihat Istilah.

MessageModel

Mode konsumsi untuk instance konsumen. Nilai valid:

  • CLUSTERING: konsumsi klustering. Nilai ini adalah nilai default.

  • BROADCASTING: konsumsi siaran.

ConsumeThreadNums

Jumlah utas konsumsi untuk instance konsumen. Nilai default: 20.

MaxReconsumeTimes

Jumlah maksimum percobaan ulang saat konsumsi gagal. Nilai default: 16.

ConsumeTimeout

Jangka waktu timeout untuk mengonsumsi setiap pesan. Jika waktu untuk mengonsumsi pesan melebihi jangka waktu timeout yang ditentukan, pesan gagal dikonsumsi dan akan dikirim ulang setelah interval percobaan ulang. Atur nilai yang sesuai untuk setiap bisnis. Nilai default: 15. Satuan: menit.

suspendTimeMillis

Interval percobaan ulang untuk pesan terurut yang gagal dikonsumsi.

maxCachedMessageAmount

Jumlah maksimum pesan yang dapat disimpan sementara di klien konsumen lokal Anda. Nilai valid: 100 hingga 50000. Nilai default: 5000.

Parameter ini berlaku pada klien. Kuota dibagi secara merata ke topik-topik yang telah dilanggan oleh pelanggan. Misalnya, jika Anda mengatur nilai parameter menjadi 1000 untuk klien konsumen yang melanggan 2 topik, maksimal 500 pesan dapat disimpan sementara untuk setiap topik.

Jika klien konsumen menarik beberapa pesan sekaligus, jumlah aktual pesan yang perlu disimpan sementara bisa lebih besar daripada nilai yang Anda tetapkan untuk maxCachedMessageAmount.

Kami merekomendasikan agar Anda mengatur nilai parameter menjadi dua kali jumlah pesan yang dapat dikonsumsi klien konsumen Anda per detik.

Penting

Tetapkan nilai yang tepat. Nilai yang terlalu besar dapat menyebabkan kesalahan Out-of-Memory (OOM) pada klien Anda.

maxCachedMessageSizeInMiB

Ukuran maksimum pesan yang dapat disimpan sementara di klien lokal Anda. Nilai valid: 16 MB hingga 2048 MB. Nilai default: 512 MB.

Tabel 2. Parameter yang Diperlukan Saat Message Queue for Apache RocketMQ Mendorong Beberapa Pesan ke Konsumen Sekaligus

Parameter

Deskripsi

ConsumeMessageBatchMaxSize

Jumlah maksimum pesan yang tersimpan sementara yang dapat didorong ke konsumen sekaligus. Jika jumlah pesan yang tersimpan mencapai nilai parameter yang Anda tentukan, ApsaraMQ for RocketMQ mendorong pesan-pesan yang tersimpan ke konsumen sekaligus. Nilai default: 32. Nilai valid: 1 hingga 1024.

BatchConsumeMaxAwaitDurationInSeconds

Jangka waktu tunggu sebelum beberapa pesan yang tersimpan didorong ke konsumen sekaligus. ApsaraMQ for RocketMQ mendorong beberapa pesan yang tersimpan ke konsumen sekaligus setelah waktu yang ditentukan oleh parameter ini. Nilai default: 0. Nilai valid: 0 hingga 450. Satuan: detik.

Tabel 3. Parameter Spesifik untuk Mode Pull

Parameter

Deskripsi

maxCachedMessageSizeInMiB

Ukuran maksimum pesan yang dapat disimpan sementara oleh konsumen di klien untuk satu partisi. Nilai default: 100 MiB. Nilai valid: 16 MiB hingga 2048 MiB.

Penting

Tetapkan nilai yang tepat. Nilai yang terlalu besar dapat menyebabkan kesalahan OOM pada klien Anda.

autoCommit

Menentukan apakah akan secara otomatis melakukan commit offset konsumen. Nilai default: true.

autoCommitIntervalMillis

Interval antara operasi melakukan commit offset konsumen secara otomatis. Nilai default: 5. Satuan: detik.

pollTimeoutMillis

Jangka waktu timeout untuk menarik pesan setiap kali. Nilai default: 5. Satuan: detik.

Untuk informasi lebih lanjut tentang partisi dan offset, lihat Istilah.

Referensi

Untuk informasi lebih lanjut tentang kode contoh untuk mengirim dan berlangganan pesan, lihat topik-topik berikut: