SDK ApsaraMQ for RocketMQ untuk Java digunakan untuk mengirim dan berlangganan pesan. Dalam ApsaraMQ for RocketMQ, pengguna dapat memperoleh pesan dalam mode push atau pull. Bagian ini menjelaskan metode dan parameter yang digunakan untuk mengirim dan berlangganan pesan.
Informasi latar belakang
ApsaraMQ for RocketMQ mendukung mode berikut untuk memperoleh pesan:
Push: ApsaraMQ for RocketMQ mendorong pesan ke konsumen. Dalam mode push, ApsaraMQ for RocketMQ dapat mendorong beberapa pesan ke konsumen sekaligus. Untuk informasi lebih lanjut, lihat Konsumsi Batch.
Pull: Konsumen menarik pesan dari ApsaraMQ for RocketMQ.
Mode pull memberikan lebih banyak opsi untuk menerima pesan serta kontrol yang lebih besar atas penarikan pesan dibandingkan dengan mode push.
Untuk menggunakan mode pull, pastikan bahwa instance ApsaraMQ for RocketMQ Anda adalah Enterprise Platinum Edition.
Parameter umum
Parameter | Deskripsi |
NAMESRV_ADDR | TCP endpoint. Anda dapat mengklik Instance Details di konsol ApsaraMQ for RocketMQ untuk memperoleh TCP endpoint. |
AccessKey | ID AccessKey digunakan sebagai pengenal unik untuk otentikasi. Untuk informasi tentang cara memperoleh ID AccessKey, lihat Buat pasangan AccessKey. |
SecretKey | Rahasia AccessKey digunakan sebagai kata sandi untuk otentikasi. Untuk informasi tentang cara memperoleh rahasia AccessKey, lihat Buat pasangan AccessKey. |
OnsChannel | Saluran pengguna. Nilai default: ALIYUN. Jika Anda adalah pengguna CloudTmall, atur nilainya menjadi CLOUD. |
Metode untuk mengirim pesan

Parameter untuk mengirim pesan
Parameter | Deskripsi |
SendMsgTimeoutMillis | Jangka waktu timeout untuk mengirim pesan. Satuan: milidetik. |
CheckImmunityTimeInSeconds | Jangka waktu terpendek yang harus ditunggu sistem sebelum memeriksa status pesan transaksional untuk pertama kalinya. Satuan: detik. |
shardingKey | Kunci partisi yang digunakan untuk menentukan partisi tempat pesan terurut didistribusikan. |
Metode untuk berlangganan pesan
Gambar 1. Metode dalam Mode Push
Gambar 2. Metode dalam Mode Pull
Berikut adalah deskripsi metode yang dapat dipanggil dalam mode pull:
public interface PullConsumer extends Admin {
/**
* Query the partition information of a topic. This method returns all partitions of the topic. You can call this method only after your pull consumers start to run.
*/
Set<TopicPartition> topicPartitions(String topic);
/**
* Specify a partition from which you want to pull messages. This method does not implement rebalancing. You must make sure that messages in all partitions can be consumed. If this method is called multiple times, the system replaces the partitions to which subscribers have subscribed instead of increasing the number of partitions to which subscribers subscribe.
*/
void assign(Collection<TopicPartition> topicPartitions);
/**
* Pull messages. The maxBatchMessageCount parameter specifies the maximum number of messages that can be pulled at a time. You can specify a timeout period in milliseconds.
*/
List<Message> poll(long timeout);
/**
* Reset the consumer offset of a specified partition to a specified position. The specified position must be between the minimum offset and the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seek(TopicPartition topicPartition, long offset);
/**
* Reset the consumer offset of a specified partition to the minimum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToBeginning(TopicPartition topicPartition);
/**
* Reset the consumer offset of a specified partition to the maximum offset of the partition. You can call this method only after your pull consumers start to run. A subscriber must subscribe to the specified partition.
*/
void seekToEnd(TopicPartition topicPartition);
/**
* Suspend message consumption in a specified partition.
*/
void pause(Collection<TopicPartition> topicPartitions);
/**
* Resume message consumption in a specified partition.
*/
void resume(Collection<TopicPartition> topicPartitions);
/**
* Query an offset based on a timestamp in a specified partition. The timestamp indicates when a message is stored to a broker. The offset corresponds to the first timestamp that is greater than or equal to the specified timestamp.
*/
Long offsetForTimestamp(TopicPartition topicPartition, Long timestamp);
/**
* Query the latest consumer offset of a specified partition.
*/
Long committed(TopicPartition topicPartition);
/**
* Manually commit a consumer offset. The consumer offset is synchronized to your on-premises client and then committed to your broker by using a thread in an asynchronous environment.
*/
void commitSync();
interface TopicPartitionChangeListener {
/**
* This method is called when the partitions of a topic change, for example, when the number of partitions of a topic changes due to broker scaling.
*/
void onChanged(Set<TopicPartition> topicPartitions);
}
/**
* Register TopicPartitionChangeListener that listens for changes in the partitions of a topic. The onChanged method can be called back for the registered listener when the number of partitions of a topic changes due to broker scaling. By default, the maximum delay for a callback is five seconds.
*/
void registerTopicPartitionChangedListener(String topic, TopicPartitionChangeListener callback);
}Parameter
Parameter | Deskripsi |
GROUP_ID | ID grup yang telah Anda buat di konsol ApsaraMQ for RocketMQ. Untuk informasi lebih lanjut, lihat Istilah. |
MessageModel | Mode konsumsi untuk instance konsumen. Nilai valid:
|
ConsumeThreadNums | Jumlah utas konsumsi untuk instance konsumen. Nilai default: 20. |
MaxReconsumeTimes | Jumlah maksimum percobaan ulang saat konsumsi gagal. Nilai default: 16. |
ConsumeTimeout | Jangka waktu timeout untuk mengonsumsi setiap pesan. Jika waktu untuk mengonsumsi pesan melebihi jangka waktu timeout yang ditentukan, pesan gagal dikonsumsi dan akan dikirim ulang setelah interval percobaan ulang. Atur nilai yang sesuai untuk setiap bisnis. Nilai default: 15. Satuan: menit. |
suspendTimeMillis | Interval percobaan ulang untuk pesan terurut yang gagal dikonsumsi. |
maxCachedMessageAmount | Jumlah maksimum pesan yang dapat disimpan sementara di klien konsumen lokal Anda. Nilai valid: 100 hingga 50000. Nilai default: 5000. Parameter ini berlaku pada klien. Kuota dibagi secara merata ke topik-topik yang telah dilanggan oleh pelanggan. Misalnya, jika Anda mengatur nilai parameter menjadi 1000 untuk klien konsumen yang melanggan 2 topik, maksimal 500 pesan dapat disimpan sementara untuk setiap topik. Jika klien konsumen menarik beberapa pesan sekaligus, jumlah aktual pesan yang perlu disimpan sementara bisa lebih besar daripada nilai yang Anda tetapkan untuk maxCachedMessageAmount. Kami merekomendasikan agar Anda mengatur nilai parameter menjadi dua kali jumlah pesan yang dapat dikonsumsi klien konsumen Anda per detik. Penting Tetapkan nilai yang tepat. Nilai yang terlalu besar dapat menyebabkan kesalahan Out-of-Memory (OOM) pada klien Anda. |
maxCachedMessageSizeInMiB | Ukuran maksimum pesan yang dapat disimpan sementara di klien lokal Anda. Nilai valid: 16 MB hingga 2048 MB. Nilai default: 512 MB. |
Parameter | Deskripsi |
ConsumeMessageBatchMaxSize | Jumlah maksimum pesan yang tersimpan sementara yang dapat didorong ke konsumen sekaligus. Jika jumlah pesan yang tersimpan mencapai nilai parameter yang Anda tentukan, ApsaraMQ for RocketMQ mendorong pesan-pesan yang tersimpan ke konsumen sekaligus. Nilai default: 32. Nilai valid: 1 hingga 1024. |
BatchConsumeMaxAwaitDurationInSeconds | Jangka waktu tunggu sebelum beberapa pesan yang tersimpan didorong ke konsumen sekaligus. ApsaraMQ for RocketMQ mendorong beberapa pesan yang tersimpan ke konsumen sekaligus setelah waktu yang ditentukan oleh parameter ini. Nilai default: 0. Nilai valid: 0 hingga 450. Satuan: detik. |
Parameter | Deskripsi |
maxCachedMessageSizeInMiB | Ukuran maksimum pesan yang dapat disimpan sementara oleh konsumen di klien untuk satu partisi. Nilai default: 100 MiB. Nilai valid: 16 MiB hingga 2048 MiB. Penting Tetapkan nilai yang tepat. Nilai yang terlalu besar dapat menyebabkan kesalahan OOM pada klien Anda. |
autoCommit | Menentukan apakah akan secara otomatis melakukan commit offset konsumen. Nilai default: true. |
autoCommitIntervalMillis | Interval antara operasi melakukan commit offset konsumen secara otomatis. Nilai default: 5. Satuan: detik. |
pollTimeoutMillis | Jangka waktu timeout untuk menarik pesan setiap kali. Nilai default: 5. Satuan: detik. |
Untuk informasi lebih lanjut tentang partisi dan offset, lihat Istilah.
Referensi
Untuk informasi lebih lanjut tentang kode contoh untuk mengirim dan berlangganan pesan, lihat topik-topik berikut: