ApsaraMQ for RabbitMQ menyediakan SDK untuk framework Spring. Topik ini menjelaskan cara mengintegrasikan SDK Spring untuk mengirim dan menerima pesan.
Prasyarat
Anda telah membuat sumber daya seperti instans, vhost, exchange, dan antrian di Konsol ApsaraMQ for RabbitMQ. Untuk informasi selengkapnya, lihat Langkah 2: Membuat sumber daya.
Spring Boot 2.1.2.RELEASE atau versi yang lebih baru telah diinstal.
Proyek demo
Klik SpringBootDemo.zip untuk mengunduh proyek demo.
Langkah 1: Konfigurasi parameter
Dalam file application.properties atau application.yml, atur parameter konfigurasi. Contoh berikut menggunakan file application.properties.
# Titik akhir. Dapatkan titik akhir di halaman Detail Instans di Konsol ApsaraMQ for RabbitMQ.
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# Port yang digunakan untuk terhubung ke ApsaraMQ for RabbitMQ.
spring.rabbitmq.port=5672
# Nama pengguna statis dari instans. Lihat nama pengguna di halaman Akun Statis di Konsol ApsaraMQ for RabbitMQ.
spring.rabbitmq.username=******
# Kata sandi statis dari instans. Lihat kata sandi di halaman Akun Statis di Konsol ApsaraMQ for RabbitMQ.
spring.rabbitmq.password=******
# Host virtual, yang menyediakan isolasi logis. Lihat host virtual di halaman Vhosts di Konsol ApsaraMQ for RabbitMQ.
spring.rabbitmq.virtual-host=test_vhost
# Mode acknowledgment (Ack) pesan.
# 1. none: Setelah konsumen menerima pesan, server menganggap pesan telah berhasil diproses, terlepas dari apakah konsumsi berhasil atau tidak. Ini adalah mode autoAck di RabbitMQ.
# 2. auto: Klien secara otomatis mengirim ack setelah pesan berhasil dikonsumsi. Jika pemrosesan pesan gagal, klien mengirim nack atau melemparkan pengecualian. Anda tidak perlu memanggil Channel.basicAck() secara eksplisit.
# 3. manual: Mengirim Ack secara manual. Anda harus memanggil Channel.basicAck() secara eksplisit setelah pesan berhasil dikonsumsi.
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Atur mode cache ke CONNECTION. ApsaraMQ for RabbitMQ menggunakan arsitektur terdistribusi. Dalam mode CONNECTION, klien dapat terhubung ke beberapa node layanan dalam kluster secara lebih seimbang. Metode ini dapat secara efektif mencegah hot spot beban dan meningkatkan efisiensi pengiriman serta konsumsi pesan.
spring.rabbitmq.cache.connection.mode=connection
# Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.cache.connection.size=50
# Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.cache.channel.size=1
# Jumlah maksimum pesan yang belum diakui (Ack) yang dapat diproses oleh konsumen dalam satu waktu (QoS). Server ApsaraMQ for RabbitMQ menggunakan min{prefetch, 100} sebagai nilai QoS. Jika kemampuan pemrosesan konsumen rendah, kurangi nilai ini.
spring.rabbitmq.listener.simple.prefetch=100
# Jumlah minimum konsumen konkuren untuk pendengar RabbitMQ. Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.listener.simple.concurrency=10
# Jumlah maksimum konsumen konkuren untuk pendengar RabbitMQ. Saat laju konsumsi cukup tinggi, klien akan memulai sejumlah konsumen max-concurrency untuk mengonsumsi pesan.
spring.rabbitmq.listener.simple.max-concurrency=20Anda dapat menambahkan konfigurasi opsional berikut sesuai kebutuhan:
Langkah 2: Gunakan SDK untuk mengirim dan menerima pesan
Hasilkan pesan
Dalam RabbitMQService, peroleh RabbitTemplate melalui dependency injection dan panggil metode send-nya untuk mengirim pesan.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// Atur MessageId.
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// Buat Message.
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* Panggil antarmuka send() untuk mengirim pesan.
* exchange: nama exchange.
* routingKey: kunci routing.
* message: isi pesan.
* correlationData digunakan untuk publisher confirms.
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}Konsumsi pesan
Gunakan anotasi @RabbitListener untuk mengonsumsi pesan:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* Menerima pesan.
* @param message Pesan.
* @param channel Channel.
* @throws IOException
* Ganti queues dengan nama antrian yang telah Anda buat.
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// Masukkan logika bisnis untuk konsumsi pesan.
...
// Anda harus mengembalikan Ack dalam periode validitas Ack (waktu tunggu konsumsi). Jika tidak, konfirmasi tidak valid dan pesan akan dikirim ulang.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}Berikut adalah konfigurasi opsional umum untuk RabbitListener: