全部产品
Search
文档中心

ApsaraMQ for RabbitMQ:Integrasi Spring

更新时间:Dec 03, 2025

ApsaraMQ for RabbitMQ menyediakan SDK untuk framework Spring. Topik ini menjelaskan cara mengintegrasikan SDK Spring untuk mengirim dan menerima pesan.

Prasyarat

Proyek demo

Klik SpringBootDemo.zip untuk mengunduh proyek demo.

Langkah 1: Konfigurasi parameter

Dalam file application.properties atau application.yml, atur parameter konfigurasi. Contoh berikut menggunakan file application.properties.

# Titik akhir. Dapatkan titik akhir di halaman Detail Instans di Konsol ApsaraMQ for RabbitMQ. 
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# Port yang digunakan untuk terhubung ke ApsaraMQ for RabbitMQ.
spring.rabbitmq.port=5672
# Nama pengguna statis dari instans. Lihat nama pengguna di halaman Akun Statis di Konsol ApsaraMQ for RabbitMQ. 
spring.rabbitmq.username=******
# Kata sandi statis dari instans. Lihat kata sandi di halaman Akun Statis di Konsol ApsaraMQ for RabbitMQ. 
spring.rabbitmq.password=******
# Host virtual, yang menyediakan isolasi logis. Lihat host virtual di halaman Vhosts di Konsol ApsaraMQ for RabbitMQ. 
spring.rabbitmq.virtual-host=test_vhost
# Mode acknowledgment (Ack) pesan.
# 1. none: Setelah konsumen menerima pesan, server menganggap pesan telah berhasil diproses, terlepas dari apakah konsumsi berhasil atau tidak. Ini adalah mode autoAck di RabbitMQ.
# 2. auto: Klien secara otomatis mengirim ack setelah pesan berhasil dikonsumsi. Jika pemrosesan pesan gagal, klien mengirim nack atau melemparkan pengecualian. Anda tidak perlu memanggil Channel.basicAck() secara eksplisit.
# 3. manual: Mengirim Ack secara manual. Anda harus memanggil Channel.basicAck() secara eksplisit setelah pesan berhasil dikonsumsi.
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# Atur mode cache ke CONNECTION. ApsaraMQ for RabbitMQ menggunakan arsitektur terdistribusi. Dalam mode CONNECTION, klien dapat terhubung ke beberapa node layanan dalam kluster secara lebih seimbang. Metode ini dapat secara efektif mencegah hot spot beban dan meningkatkan efisiensi pengiriman serta konsumsi pesan.
spring.rabbitmq.cache.connection.mode=connection
# Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.cache.connection.size=50
# Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.cache.channel.size=1

# Jumlah maksimum pesan yang belum diakui (Ack) yang dapat diproses oleh konsumen dalam satu waktu (QoS). Server ApsaraMQ for RabbitMQ menggunakan min{prefetch, 100} sebagai nilai QoS. Jika kemampuan pemrosesan konsumen rendah, kurangi nilai ini.
spring.rabbitmq.listener.simple.prefetch=100
# Jumlah minimum konsumen konkuren untuk pendengar RabbitMQ. Sesuaikan nilai sesuai kebutuhan.
spring.rabbitmq.listener.simple.concurrency=10
# Jumlah maksimum konsumen konkuren untuk pendengar RabbitMQ. Saat laju konsumsi cukup tinggi, klien akan memulai sejumlah konsumen max-concurrency untuk mengonsumsi pesan.
spring.rabbitmq.listener.simple.max-concurrency=20

Anda dapat menambahkan konfigurasi opsional berikut sesuai kebutuhan:

Konfigurasi opsional

Properti

Deskripsi

spring.rabbitmq.addresses

Alamat server yang dihubungkan oleh klien. Pisahkan beberapa alamat dengan koma (,).

Jika Anda mengatur kedua parameter spring.rabbitmq.host dan spring.rabbitmq.addresses, parameter spring.rabbitmq.addresses akan didahulukan.

spring.rabbitmq.dynamic

Menentukan apakah akan membuat bean AmqpAdmin. Nilai default-nya adalah true.

spring.rabbitmq.connection-timeout

Waktu tunggu koneksi. Satuan: milidetik. Nilai 0 berarti tanpa batas waktu.

spring.rabbitmq.requested-heartbeat

Waktu tunggu heartbeat. Satuan: detik. Nilai default-nya adalah 60.

spring.rabbitmq.publisher-confirms

Menentukan apakah akan mengaktifkan mekanisme konfirmasi penerbit.

spring.rabbitmq.publisher-returns

Menentukan apakah akan mengaktifkan mekanisme pengembalian penerbit.

spring.rabbitmq.ssl.enabled

Menentukan apakah akan mengaktifkan autentikasi Sertifikat SSL.

spring.rabbitmq.ssl.key-store

Jalur ke key store yang menyimpan Sertifikat SSL.

spring.rabbitmq.ssl.key-store-password

Kata sandi untuk mengakses key store.

spring.rabbitmq.ssl.trust-store

Alamat tepercaya dengan Sertifikat SSL.

spring.rabbitmq.ssl.trust-store-password

Kata sandi untuk mengakses trust store.

spring.rabbitmq.ssl.algorithm

Algoritma yang digunakan oleh SSL, seperti TLSv1.2.

spring.rabbitmq.ssl.validate-server-certificate

Menentukan apakah akan mengaktifkan validasi sertifikat server.

spring.rabbitmq.ssl.verify-hostname

Menentukan apakah akan mengaktifkan verifikasi host.

spring.rabbitmq.cache.channel.size

Jumlah channel yang disimpan dalam cache.

spring.rabbitmq.cache.channel.checkout-timeout

Waktu tunggu untuk mendapatkan channel dari cache ketika ukuran cache telah tercapai.

Satuan: milidetik. Nilai 0 berarti channel baru selalu dibuat.

spring.rabbitmq.cache.connection.size

Jumlah koneksi yang di-cache. Parameter ini hanya berlaku dalam mode CONNECTION.

spring.rabbitmq.cache.connection.mode

Mode cache koneksi. Nilai yang valid adalah:

  • CHANNEL

  • CONNECTION

Disarankan menggunakan mode CONNECTION.

spring.rabbitmq.listener.type

Jenis kontainer pendengar. Nilai yang valid adalah:

  • simple

  • direct

Nilai default-nya adalah simple.

spring.rabbitmq.listener.simple.auto-startup

Menentukan apakah kontainer akan dimulai secara otomatis saat aplikasi dimulai. Nilai default-nya adalah true.

spring.rabbitmq.listener.simple.acknowledge-mode

Mode acknowledgment pesan. Nilai yang valid adalah:

  • none: Setelah konsumen menerima pesan, server menganggap pesan telah berhasil diproses, terlepas dari apakah konsumsi berhasil atau tidak. Ini adalah mode autoAck di RabbitMQ.

  • manual: Mengirim Ack secara manual. Anda harus memanggil Basic.ack secara eksplisit setelah pesan berhasil dikonsumsi.

  • auto: Klien secara otomatis mengirim ack setelah pesan berhasil dikonsumsi. Jika pemrosesan pesan gagal, klien mengirim nack atau melemparkan pengecualian. Anda tidak perlu memanggil Basic.ack secara eksplisit.

Nilai default-nya adalah auto.

spring.rabbitmq.listener.simple.concurrency

Jumlah minimum konsumen.

spring.rabbitmq.listener.simple.max-concurrency

Jumlah maksimum konsumen.

spring.rabbitmq.listener.simple.prefetch

Jumlah maksimum pesan yang belum diakui (Ack) yang dapat diproses oleh konsumen dalam satu waktu. Ini setara dengan mengatur nilai QoS dengan memanggil metode Basic.qos. Jika transaksi digunakan, nilai ini harus lebih besar dari atau sama dengan ukuran transaksi.

spring.rabbitmq.listener.simple.transaction-size

Jumlah pesan yang diproses dalam satu transaksi.

spring.rabbitmq.listener.simple.default-requeue-rejected

Menentukan apakah pesan yang ditolak akan dimasukkan kembali ke antrian. Nilai default-nya adalah true.

spring.rabbitmq.listener.simple.missing-queues-fatal

Menentukan apakah pendengar gagal jika antrian yang dideklarasikan tidak tersedia di broker, atau apakah kontainer berhenti jika satu atau beberapa antrian dihapus saat runtime. Nilai default-nya adalah true.

spring.rabbitmq.listener.simple.idle-event-interval

Interval untuk memublikasikan event kontainer idle. Satuan: milidetik.

spring.rabbitmq.template.mandatory

Menentukan apakah akan mengaktifkan pengiriman pesan wajib. Nilai default-nya adalah false.

spring.rabbitmq.template.receive-timeout

Waktu tunggu untuk operasi receive().

spring.rabbitmq.template.reply-timeout

Waktu tunggu untuk operasi sendAndReceive().

Langkah 2: Gunakan SDK untuk mengirim dan menerima pesan

Hasilkan pesan

Dalam RabbitMQService, peroleh RabbitTemplate melalui dependency injection dan panggil metode send-nya untuk mengirim pesan.

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // Atur MessageId.
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // Buat Message.
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * Panggil antarmuka send() untuk mengirim pesan.
         * exchange: nama exchange.
         * routingKey: kunci routing.
         * message: isi pesan.
         * correlationData digunakan untuk publisher confirms.
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

Konsumsi pesan

Gunakan anotasi @RabbitListener untuk mengonsumsi pesan:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

     /**
     * Menerima pesan.
     * @param message Pesan.
     * @param channel Channel.
     * @throws IOException
     * Ganti queues dengan nama antrian yang telah Anda buat.
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
        // Masukkan logika bisnis untuk konsumsi pesan.
        ...
        // Anda harus mengembalikan Ack dalam periode validitas Ack (waktu tunggu konsumsi). Jika tidak, konfirmasi tidak valid dan pesan akan dikirim ulang.
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

Berikut adalah konfigurasi opsional umum untuk RabbitListener:

Konfigurasi opsional

Properti

Deskripsi

ackMode

Mode acknowledgment pesan kustom. Ini menimpa pengaturan spring.rabbitmq.listener.simple.acknowledge-mode.

admin

Referensi ke AmqpAdmin untuk manajemen sumber daya AMQP.

autoStartup

Menentukan apakah kontainer akan dimulai secara otomatis saat aplikasi dimulai. Ini menimpa pengaturan spring.rabbitmq.listener.simple.auto-startup.

bindings

Array binding antara antrian dan exchange, yang berisi informasi binding.

concurrency

Mengatur jumlah thread konkuren untuk kontainer pendengar.

errorHandler

Mengonfigurasi penanganan kesalahan untuk pengecualian yang dilemparkan oleh metode pendengar.

exclusive

Mengaktifkan mode eksklusif untuk antrian. Artinya, konsumen memiliki akses eksklusif ke antrian tersebut, dan tidak ada konsumen lain yang dapat menerima pesan darinya. Fitur ini saat ini tidak didukung.

queues

Mendeklarasikan antrian yang didengarkan oleh pendengar ini.

queuesToDeclare

Menentukan antrian yang akan dideklarasikan secara eksplisit.