Skenario
Pesan normal umumnya digunakan dalam skenario desentralisasi layanan mikro, integrasi data, dan event-driven. Skenario ini biasanya memiliki persyaratan tinggi pada keandalan transmisi, tetapi tidak terlalu ketat pada waktu dan urutan pemrosesan pesan.
Skenario 1: Desentralisasi asinkron layanan mikro

Gambar di atas mengilustrasikan skenario transaksi e-commerce online. Dalam skenario ini, sistem pesanan upstream mengenkapsulasi penempatan pesanan dan pembayaran sebagai pesan normal independen, lalu mengirimkannya ke broker ApsaraMQ for RocketMQ. Sistem downstream kemudian berlangganan pesan dari broker sesuai kebutuhan dan memproses tugas berdasarkan logika konsumsi lokal. Pesan-pesan tersebut bersifat independen dan tidak saling terkait.
Skenario 2: Integrasi dan transmisi data

Gambar di atas menggunakan pengumpulan log offline sebagai contoh. Komponen instrumentasi digunakan untuk mengumpulkan log operasi dari aplikasi frontend dan meneruskannya ke ApsaraMQ for RocketMQ. Setiap pesan merupakan sepotong data log yang tidak memerlukan pemrosesan oleh ApsaraMQ for RocketMQ. ApsaraMQ for RocketMQ hanya bertugas mengirimkan data log ke sistem penyimpanan downstream, sementara tugas selanjutnya diproses oleh aplikasi backend.
Mekanisme kerja
Apa itu pesan normal?
Pesan normal adalah pesan dengan fitur perpesanan dasar di ApsaraMQ for RocketMQ. Pesan ini mendukung komunikasi asinkron yang terdesentralisasi antara produser dan konsumen.

Siklus hidup pesan normal
Inisialisasi
Pesan dibangun dan diinisialisasi oleh produser, lalu siap dikirim ke broker.
Menunggu konsumsi
Pesan dikirim ke broker dan tersedia untuk dikonsumsi oleh konsumen.
Sedang Dikonsumsi
Pesan diperoleh oleh konsumen dan diproses berdasarkan logika bisnis lokal konsumen.
Selama proses ini, broker menunggu konsumen mengembalikan hasil konsumsi. Jika tidak ada respons dalam periode waktu tertentu, ApsaraMQ for RocketMQ akan melakukan percobaan ulang pesan. Untuk informasi lebih lanjut, lihat Percobaan Ulang Konsumsi.
Komitmen Konsumsi
Konsumen menyelesaikan konsumsi dan mengirimkan hasil konsumsi ke broker. Broker kemudian menandai apakah pesan saat ini telah dikonsumsi.
Secara default, ApsaraMQ for RocketMQ menyimpan semua pesan. Saat hasil konsumsi dikomit, pesan ditandai sebagai dikonsumsi alih-alih langsung dihapus. Pesan hanya dihapus jika masa retensi berakhir atau sistem kehabisan ruang penyimpanan. Sebelum dihapus, konsumen dapat mengonsumsi ulang pesan tersebut.
Penghapusan Pesan
Jika masa retensi pesan berakhir atau ruang penyimpanan tidak mencukupi, ApsaraMQ for RocketMQ menghapus pesan paling awal disimpan dari file fisik secara bergulir. Untuk informasi lebih lanjut, lihat Penyimpanan dan Pembersihan Pesan.
Batasan
Pesan normal hanya mendukung topik yang nilai MessageType-nya diatur ke Normal.
Contoh
Anda dapat menentukan kunci pesan dan tag untuk memfilter atau mencari pesan normal. Kode berikut memberikan contoh cara mengirim dan menerima pesan normal dalam Java.
Untuk informasi tentang kode sampel lengkap, lihat Apache RocketMQ 5.x SDK (direkomendasikan).
Kode sampel
Kirim pesan normal
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir di tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance Elastic Compute Service (ECS) dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, kami sarankan Anda menentukan titik akhir virtual private cloud (VPC).
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Nama topik ke mana pesan dikirim. Sebelum Anda menggunakan topik untuk menerima pesan, Anda harus membuat topik di konsol ApsaraMQ for RocketMQ. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance tanpa server dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis memperoleh nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ tanpa server, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance tanpa server dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
/**
* Saat Anda menginisialisasi produser, Anda dapat menentukan topik yang ingin Anda gunakan untuk memeriksa apakah pengaturan topik valid dan mencegah topik tidak valid dimulai.
* Anda tidak perlu menentukan topik untuk pesan non-transaksional. Broker secara dinamis memeriksa apakah topik valid.
* Catatan: Untuk mencegah operasi API yang dipanggil untuk memeriksa pesan transaksional gagal, Anda harus menentukan topik untuk pesan transaksional sebelumnya.
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
// Kirim pesan normal.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Kunci pesan. Anda dapat menggunakan kunci untuk mencari pesan.
.setKeys("messageKey")
// Tag pesan. Konsumen dapat menggunakan tag untuk memfilter pesan.
.setTag("messageTag")
// Tubuh pesan.
.setBody("messageBody".getBytes())
.build();
try {
// Kirim pesan. Perhatikan hasilnya dan tangkap pengecualian seperti kegagalan.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
Konsumsi pesan normal dalam mode push
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir di tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance Elastic Compute Service (ECS) dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
// Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String consumerGroup = "Grup Konsumen Anda";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance tanpa server dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis memperoleh nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ tanpa server, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance tanpa server dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
// Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan di topik dilanggan.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Inisialisasi konsumen push. Saat Anda menginisialisasi konsumen push, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Grup konsumen.
.setConsumerGroup(consumerGroup)
// Langganan.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Pendengar pesan.
.setMessageListener(messageView -> {
// Konsumsi pesan dan kembalikan hasil konsumsi.
// LOGGER.info("Konsumsi pesan={}", messageView);
System.out.println("Konsumsi Pesan: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// Jika Anda tidak lagi memerlukan konsumen push, matikan prosesnya.
//pushConsumer.close();
}
}
Konsumsi pesan normal dalam mode sederhana
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir di tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance Elastic Compute Service (ECS) dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
// Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String consumerGroup = "Grup Konsumen Anda";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance tanpa server dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan di instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis memperoleh nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ tanpa server, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance tanpa server dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
Duration awaitDuration = Duration.ofSeconds(10);
// Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan di topik dilanggan.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Inisialisasi konsumen sederhana. Saat Anda menginisialisasi konsumen sederhana, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Grup konsumen.
.setConsumerGroup(consumerGroup)
// Durasi timeout untuk permintaan polling panjang.
.setAwaitDuration(awaitDuration)
// Langganan.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Jumlah maksimum pesan yang akan ditarik.
int maxMessageNum = 16;
// Waktu tak terlihat pesan.
Duration invisibleDuration = Duration.ofSeconds(10);
// Jika Anda menggunakan konsumen sederhana untuk mengonsumsi pesan, klien harus memperoleh dan mengonsumsi pesan dalam loop.
// Untuk mengonsumsi pesan secara real-time, kami sarankan Anda menggunakan beberapa thread untuk menarik pesan secara bersamaan.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Pesan diterima: {}", messageView);
System.out.println("Pesan diterima: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// Setelah konsumsi selesai, konsumen harus memanggil metode ACK untuk mengirimkan hasil konsumsi ke broker.
consumer.ack(message);
System.out.println("Pesan berhasil diakui, messageId= " + messageId);
//LOGGER.info("Pesan berhasil diakui, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Pesan gagal diakui, messageId={}", messageId, t);
}
}
}
// Jika Anda tidak lagi memerlukan konsumen sederhana, matikan prosesnya.
// consumer.close();
}
}
Catatan penggunaan
Tetapkan kunci unik global untuk setiap pesan untuk mempermudah pemecahan masalah
ApsaraMQ for RocketMQ memungkinkan Anda menentukan kunci pesan kustom. Anda dapat menggunakan kunci pesan untuk menanyakan pesan dan jejaknya secara efisien.
Oleh karena itu, saat mengirim pesan, kami sarankan Anda menggunakan informasi unik terkait bisnis Anda, seperti ID pesanan atau ID pengguna, sebagai kunci. Ini membantu Anda dengan cepat menemukan pesan dalam kueri berikutnya.