全部产品
Search
文档中心

ApsaraMQ for RabbitMQ:Langkah 3: Gunakan SDK untuk mengirim dan menerima pesan

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan kit pengembangan perangkat lunak (SDK) open source untuk terhubung ke broker ApsaraMQ for RabbitMQ serta mengirim dan menerima pesan, dengan SDK Java sebagai contoh.

Sebelum memulai

Dapatkan titik akhir instans

Sebelum mengirim dan menerima pesan, Anda harus mengonfigurasi titik akhir untuk produsen dan konsumen. Klien menggunakan titik akhir ini untuk mengakses instans ApsaraMQ for RabbitMQ.

  1. Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.

  2. Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.

  3. Di halaman Instance Details, buka tab Endpoint Information. Arahkan penunjuk tetikus ke titik akhir yang diinginkan, lalu klik ikon 复制 untuk menyalinnya.

    Jenis

    Deskripsi

    Contoh

    Titik akhir publik

    Memungkinkan Anda membaca dan menulis data dari Internet. Instans pay-as-you-go mendukung titik akhir publik secara default. Untuk menggunakan titik akhir publik pada instansi langganan, Anda harus mengaktifkan akses Internet saat membeli instans tersebut.

    XXX.net.mq.amqp.aliyuncs.com

    Titik akhir VPC

    Memungkinkan Anda membaca dan menulis data dalam VPC. Instans pay-as-you-go dan langganan mendukung titik akhir VPC secara default.

    XXX.vpc.mq.amqp.aliyuncs.com

Instal library dependensi Java

  1. Buat proyek Java di IntelliJ IDEA.

  2. Tambahkan dependensi berikut ke file pom.xml.

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- Semua versi open source didukung. -->
    </dependency>

Buat nama pengguna dan kata sandi

Saat menghubungkan klien RabbitMQ open source ke layanan cloud, Anda harus membuat nama pengguna dan kata sandi, lalu mengatur parameter userName dan passWord pada SDK klien. ApsaraMQ for RabbitMQ menggunakan kredensial ini untuk autentikasi izin.

Metode pembuatan nama pengguna dan kata sandi bergantung pada mode manajemen identitas dan izin instans:

  • Verifikasi identitas dan pengelolaan izin open source

    1. Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.

    2. Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.

    3. Di panel navigasi sebelah kiri, klik Users and Permissions.

    4. Di halaman Users and Permissions, klik Create Username/Password.

    5. Di panel Create Username/Password, lengkapi kolom Username, Password, dan Confirm Password, lalu klik OK.

    Catatan

    Setelah membuat nama pengguna dan kata sandi, Anda harus memberikan izin kepada pengguna tersebut. Untuk informasi selengkapnya, lihat Pengelolaan izin.

  • Alibaba Cloud Resource Access Management (RAM)

    1. Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.

    2. Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.

    3. Di panel navigasi sebelah kiri, klik Users and Permissions.

    4. Di halaman Users and Permissions, klik Create Username/Password.

    5. Di panel Create Username/Password, masukkan AccessKey ID dan AccessKey secret, lalu klik OK.

      Catatan

      Anda dapat memperoleh AccessKey ID dan AccessKey secret dari Konsol RAM Alibaba Cloud. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.

      Pasangan nama pengguna dan kata sandi statis yang dibuat akan muncul di halaman Users and Permissions. Kata sandinya disembunyikan.username and password

    6. Di kolom Password untuk pasangan nama pengguna dan kata sandi statis yang telah dibuat, klik Display untuk melihat kata sandinya.

Buat koneksi klien

Buat factory koneksi ConnectionFactory.java untuk membuat koneksi antara klien RabbitMQ open source dan broker ApsaraMQ for RabbitMQ.


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

public class ConnectionFactory {
    private final String hostName;
    private final int port;
    private final String userName;
    private final String password;
    private final String virtualHost;
    private final boolean enableSSL;

    public ConnectionFactory(String hostName, int port, String userName,
                            String password, String virtualHost, boolean enableSSL) {
        this.hostName = hostName;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.enableSSL = enableSSL;
    }

    public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        //buat koneksi baru
        Connection con = createCon();

        //buat channel baru
        return con.createChannel();
    }

    private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();

        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(password);

        // Menentukan apakah pemulihan koneksi otomatis diaktifkan. Jika Anda mengatur parameter ini ke true, pemulihan koneksi otomatis diaktifkan. Jika Anda mengatur parameter ini ke false, pemulihan koneksi otomatis dinonaktifkan.
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // Port default.
        factory.setPort(port);

        if (enableSSL) {
            setSSL(factory);
        }

        // Periode waktu habis. Konfigurasikan parameter ini berdasarkan lingkungan jaringan.
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        return factory.newConnection();
    }

    private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init((KeyStore) null);
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
        factory.useSslProtocol(sslContext);
    }

    public void closeCon(Channel channel) {
        if (channel != null && channel.getConnection() != null) {
            try {
                channel.getConnection().close();
            } catch (Throwable t) {
            }
        }
    }
}

Produksi pesan

Di proyek Java Anda, buat program produsen bernama Producer.java. Konfigurasikan parameter yang relevan seperti yang dijelaskan dalam Deskripsi parameter SDK, lalu jalankan program tersebut. Untuk informasi selengkapnya mengenai hal-hal yang perlu diperhatikan saat mengirim pesan, lihat Apa yang perlu saya ketahui saat memproduksi pesan?.

Kode contoh:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class Producer {
    // Titik akhir instans ApsaraMQ for RabbitMQ.
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    // Nama pengguna statis instans ApsaraMQ for RabbitMQ.
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    // Kata sandi statis instans ApsaraMQ for RabbitMQ.
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    // Nama vhost instans ApsaraMQ for RabbitMQ.
    public static final String virtualHost = "vhost_test";

    // Jika Anda menggunakan port 5671, atur parameter enableSSL ke true.
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private Channel channel;
    private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
    private final ConnectionFactory factory;
    private final String exchangeName;
    private final String queueName;
    private final String routingKey;

    public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        this.factory = factory;
        this.outstandingConfirms = new ConcurrentSkipListMap<>();
        this.channel = factory.createChannel();
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
    }

    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        // Buat factory koneksi.
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);

        // Inisialisasi produsen.
        Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");

        // Deklarasikan produsen.
        producer.declare();

        producer.initChannel();

        // Kirim pesan.
        producer.doSend("hello,amqp");
    }

    private void initChannel() throws IOException {
        channel.confirmSelect();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);

                for (Long tag : confirmed.keySet()) {
                    String msgId = confirmed.get(tag);
                    System.out.format("Pesan dengan msgId %s telah di-ack. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                }

                confirmed.clear();
            } else {
                String msgId = outstandingConfirms.remove(deliveryTag);
                System.out.format("Pesan dengan msgId %s telah di-ack. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
            }
        };
        channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
            String msgId = outstandingConfirms.get(deliveryTag);
            System.err.format("Pesan dengan msgId %s telah di-nack. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
            // pengiriman pesan gagal, kirim ulang
        });


        channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    }

    private void declare() throws IOException {
        channel.exchangeDeclare(exchangeName, "direct", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
    }
    

    private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        try {
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();

            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));

            outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
        } catch (AlreadyClosedException e) {
            //perlu menyambung ulang jika channel ditutup.
            String message = e.getMessage();

            System.out.println(message);

            if (channelClosedByServer(message)) {
                factory.closeCon(channel);
                channel = factory.createChannel();
                this.initChannel();
                doSend(content);
            } else {
                throw e;
            }
        }
    }

    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")
            && errorMsg.contains("reply-code=541")
            && errorMsg.contains("reply-text=InternalError")) {
            return true;
        } else {
            return false;
        }
    }
}
Catatan

Instans ApsaraMQ for RabbitMQ dapat memicu throttling berdasarkan transaksi per detik (TPS) puncaknya. Untuk informasi selengkapnya, lihat Praktik terbaik untuk throttling instans.

Berlangganan pesan

Di proyek Java Anda, buat program konsumen bernama Consumer.java. Konfigurasikan parameter yang relevan seperti yang dijelaskan dalam Deskripsi parameter SDK, lalu jalankan program tersebut.

Kode contoh:


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // Titik akhir instans ApsaraMQ for RabbitMQ.
    public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
    // Nama pengguna statis instans ApsaraMQ for RabbitMQ.
    public static final String userName = "MjoxODgwNzcwODY5MD****";
    // Kata sandi statis instans ApsaraMQ for RabbitMQ.
    public static final String password = "NDAxREVDQzI2MjA0OT****";
    // Nama vhost instans ApsaraMQ for RabbitMQ.
    public static final String virtualHost = "vhost_test";
    
    // Jika Anda menggunakan port 5671, atur parameter enableSSL ke true.
    public static final int port = 5672;
    public static final boolean enableSSL = false;

    private final Channel channel;
    private final String queue;

    public Consumer(Channel channel, String queue) {
        this.channel = channel;
        this.queue = queue;
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
        Channel channel = factory.createChannel();
        channel.basicQos(50);
         
        // Nama antrian di instans ApsaraMQ for RabbitMQ. Nama antrian harus sama dengan nama antrian yang Anda tentukan untuk produsen.
        Consumer consumer = new Consumer(channel, "queue-1");

        consumer.consume();
    }

    public void consume() throws IOException, InterruptedException {
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {

                // Proses pesan.
                System.out.println("menerima: msgId=" + properties.getMessageId());

                // Konsumen harus melakukan commit acknowledgment (ack) dalam periode validitas. Jika tidak, pesan akan dikirim ulang. Sebuah pesan dapat dikirim ulang hingga 16 kali.
                // Jika pengiriman pesan gagal setelah 16 kali percobaan, pesan tersebut akan dibuang atau dikirim ke dead-letter exchange.
                // Periode validitas adalah 1 menit untuk instans Edisi Profesional, 5 menit untuk instans Edisi Enterprise dan Serverless, serta 30 menit untuk instans Edisi Platinum.
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                channel.getConnection().close();
            } catch (IOException e) {
                System.out.println("kesalahan menutup koneksi." + e);
            }
            latch.countDown();
        }));
        latch.await();
    }
}

Deskripsi parameter SDK

Parameter

Contoh

Deskripsi

hostName

XXX.net.mq.amqp.aliyuncs.com

Titik akhir instans ApsaraMQ for RabbitMQ. Untuk informasi selengkapnya tentang cara mendapatkan titik akhir, lihat Dapatkan titik akhir instans.

Port

5672

Port default. Gunakan port 5672 untuk koneksi tanpa enkripsi dan port 5671 untuk koneksi terenkripsi.

userName

MjoxODgwNzcwODY5MD****

Nama pengguna statis yang digunakan untuk autentikasi izin saat Anda menghubungkan klien ke broker ApsaraMQ for RabbitMQ.

Anda harus membuat nama pengguna statis di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Buat nama pengguna dan kata sandi.

passWord

NDAxREVDQzI2MjA0OT****

Kata sandi statis yang digunakan untuk autentikasi izin saat Anda menghubungkan klien ke broker ApsaraMQ for RabbitMQ.

Anda harus membuat kata sandi statis di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Buat nama pengguna dan kata sandi.

virtualHost

amqp_vhost

vhost yang Anda buat di instans ApsaraMQ for RabbitMQ. Anda harus membuat vhost di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya.

exchangeName

ExchangeTest

Exchange yang Anda buat di instans ApsaraMQ for RabbitMQ.

Anda harus membuat exchange di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya.

queueName

QueueTest

Antrian yang Anda buat di instans ApsaraMQ for RabbitMQ.

Anda harus membuat antrian di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya.

routingKey

RoutingKeyTest

Kunci routing yang digunakan untuk mengikat exchange ke antrian di ApsaraMQ for RabbitMQ.

Anda harus membuat binding di Konsol ApsaraMQ for RabbitMQ terlebih dahulu.

Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya.

exchangeType

topic

Jenis exchange. ApsaraMQ for RabbitMQ mendukung jenis-jenis exchange berikut. Untuk informasi selengkapnya, lihat Exchange.

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

Penting

Pastikan jenis exchange yang Anda tentukan sama dengan jenis exchange yang Anda pilih saat membuat exchange.

Referensi