Topik ini menjelaskan cara menggunakan kit pengembangan perangkat lunak (SDK) open source untuk terhubung ke broker ApsaraMQ for RabbitMQ serta mengirim dan menerima pesan, dengan SDK Java sebagai contoh.
Sebelum memulai
Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Topik ini menggunakan IntelliJ IDEA Ultimate sebagai contoh.
Dapatkan titik akhir instans
Sebelum mengirim dan menerima pesan, Anda harus mengonfigurasi titik akhir untuk produsen dan konsumen. Klien menggunakan titik akhir ini untuk mengakses instans ApsaraMQ for RabbitMQ.
Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.
Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.
Di halaman Instance Details, buka tab Endpoint Information. Arahkan penunjuk tetikus ke titik akhir yang diinginkan, lalu klik ikon
untuk menyalinnya.Jenis
Deskripsi
Contoh
Titik akhir publik
Memungkinkan Anda membaca dan menulis data dari Internet. Instans pay-as-you-go mendukung titik akhir publik secara default. Untuk menggunakan titik akhir publik pada instansi langganan, Anda harus mengaktifkan akses Internet saat membeli instans tersebut.
XXX.net.mq.amqp.aliyuncs.com
Titik akhir VPC
Memungkinkan Anda membaca dan menulis data dalam VPC. Instans pay-as-you-go dan langganan mendukung titik akhir VPC secara default.
XXX.vpc.mq.amqp.aliyuncs.com
Instal library dependensi Java
Buat proyek Java di IntelliJ IDEA.
Tambahkan dependensi berikut ke file pom.xml.
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- Semua versi open source didukung. --> </dependency>
Buat nama pengguna dan kata sandi
Saat menghubungkan klien RabbitMQ open source ke layanan cloud, Anda harus membuat nama pengguna dan kata sandi, lalu mengatur parameter userName dan passWord pada SDK klien. ApsaraMQ for RabbitMQ menggunakan kredensial ini untuk autentikasi izin.
Metode pembuatan nama pengguna dan kata sandi bergantung pada mode manajemen identitas dan izin instans:
Verifikasi identitas dan pengelolaan izin open source
Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.
Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.
Di panel navigasi sebelah kiri, klik Users and Permissions.
Di halaman Users and Permissions, klik Create Username/Password.
Di panel Create Username/Password, lengkapi kolom Username, Password, dan Confirm Password, lalu klik OK.
CatatanSetelah membuat nama pengguna dan kata sandi, Anda harus memberikan izin kepada pengguna tersebut. Untuk informasi selengkapnya, lihat Pengelolaan izin.
Alibaba Cloud Resource Access Management (RAM)
Masuk ke Konsol ApsaraMQ for RabbitMQ. Di panel navigasi sebelah kiri, klik Instances.
Di bilah navigasi atas halaman Instances, pilih wilayah tempat instans yang ingin Anda kelola berada. Lalu, di daftar instans, klik nama instans tersebut.
Di panel navigasi sebelah kiri, klik Users and Permissions.
Di halaman Users and Permissions, klik Create Username/Password.
Di panel Create Username/Password, masukkan AccessKey ID dan AccessKey secret, lalu klik OK.
CatatanAnda dapat memperoleh AccessKey ID dan AccessKey secret dari Konsol RAM Alibaba Cloud. Untuk informasi selengkapnya, lihat Buat Pasangan Kunci Akses.
Pasangan nama pengguna dan kata sandi statis yang dibuat akan muncul di halaman Users and Permissions. Kata sandinya disembunyikan.

Di kolom Password untuk pasangan nama pengguna dan kata sandi statis yang telah dibuat, klik Display untuk melihat kata sandinya.
Buat koneksi klien
Buat factory koneksi ConnectionFactory.java untuk membuat koneksi antara klien RabbitMQ open source dan broker ApsaraMQ for RabbitMQ.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
private final String hostName;
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final boolean enableSSL;
public ConnectionFactory(String hostName, int port, String userName,
String password, String virtualHost, boolean enableSSL) {
this.hostName = hostName;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.enableSSL = enableSSL;
}
public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
//buat koneksi baru
Connection con = createCon();
//buat channel baru
return con.createChannel();
}
private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
// Menentukan apakah pemulihan koneksi otomatis diaktifkan. Jika Anda mengatur parameter ini ke true, pemulihan koneksi otomatis diaktifkan. Jika Anda mengatur parameter ini ke false, pemulihan koneksi otomatis dinonaktifkan.
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// Port default.
factory.setPort(port);
if (enableSSL) {
setSSL(factory);
}
// Periode waktu habis. Konfigurasikan parameter ini berdasarkan lingkungan jaringan.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
return factory.newConnection();
}
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
public void closeCon(Channel channel) {
if (channel != null && channel.getConnection() != null) {
try {
channel.getConnection().close();
} catch (Throwable t) {
}
}
}
}Produksi pesan
Di proyek Java Anda, buat program produsen bernama Producer.java. Konfigurasikan parameter yang relevan seperti yang dijelaskan dalam Deskripsi parameter SDK, lalu jalankan program tersebut. Untuk informasi selengkapnya mengenai hal-hal yang perlu diperhatikan saat mengirim pesan, lihat Apa yang perlu saya ketahui saat memproduksi pesan?.
Kode contoh:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
// Titik akhir instans ApsaraMQ for RabbitMQ.
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// Nama pengguna statis instans ApsaraMQ for RabbitMQ.
public static final String userName = "MjoxODgwNzcwODY5MD****";
// Kata sandi statis instans ApsaraMQ for RabbitMQ.
public static final String password = "NDAxREVDQzI2MjA0OT****";
// Nama vhost instans ApsaraMQ for RabbitMQ.
public static final String virtualHost = "vhost_test";
// Jika Anda menggunakan port 5671, atur parameter enableSSL ke true.
public static final int port = 5672;
public static final boolean enableSSL = false;
private Channel channel;
private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
private final ConnectionFactory factory;
private final String exchangeName;
private final String queueName;
private final String routingKey;
public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.factory = factory;
this.outstandingConfirms = new ConcurrentSkipListMap<>();
this.channel = factory.createChannel();
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
}
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
// Buat factory koneksi.
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
// Inisialisasi produsen.
Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");
// Deklarasikan produsen.
producer.declare();
producer.initChannel();
// Kirim pesan.
producer.doSend("hello,amqp");
}
private void initChannel() throws IOException {
channel.confirmSelect();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
for (Long tag : confirmed.keySet()) {
String msgId = confirmed.get(tag);
System.out.format("Pesan dengan msgId %s telah di-ack. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
}
confirmed.clear();
} else {
String msgId = outstandingConfirms.remove(deliveryTag);
System.out.format("Pesan dengan msgId %s telah di-ack. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
String msgId = outstandingConfirms.get(deliveryTag);
System.err.format("Pesan dengan msgId %s telah di-nack. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
// pengiriman pesan gagal, kirim ulang
});
channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
}
private void declare() throws IOException {
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}
private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
try {
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
} catch (AlreadyClosedException e) {
//perlu menyambung ulang jika channel ditutup.
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null
&& errorMsg.contains("channel.close")
&& errorMsg.contains("reply-code=541")
&& errorMsg.contains("reply-text=InternalError")) {
return true;
} else {
return false;
}
}
}Instans ApsaraMQ for RabbitMQ dapat memicu throttling berdasarkan transaksi per detik (TPS) puncaknya. Untuk informasi selengkapnya, lihat Praktik terbaik untuk throttling instans.
Berlangganan pesan
Di proyek Java Anda, buat program konsumen bernama Consumer.java. Konfigurasikan parameter yang relevan seperti yang dijelaskan dalam Deskripsi parameter SDK, lalu jalankan program tersebut.
Kode contoh:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
// Titik akhir instans ApsaraMQ for RabbitMQ.
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// Nama pengguna statis instans ApsaraMQ for RabbitMQ.
public static final String userName = "MjoxODgwNzcwODY5MD****";
// Kata sandi statis instans ApsaraMQ for RabbitMQ.
public static final String password = "NDAxREVDQzI2MjA0OT****";
// Nama vhost instans ApsaraMQ for RabbitMQ.
public static final String virtualHost = "vhost_test";
// Jika Anda menggunakan port 5671, atur parameter enableSSL ke true.
public static final int port = 5672;
public static final boolean enableSSL = false;
private final Channel channel;
private final String queue;
public Consumer(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
Channel channel = factory.createChannel();
channel.basicQos(50);
// Nama antrian di instans ApsaraMQ for RabbitMQ. Nama antrian harus sama dengan nama antrian yang Anda tentukan untuk produsen.
Consumer consumer = new Consumer(channel, "queue-1");
consumer.consume();
}
public void consume() throws IOException, InterruptedException {
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// Proses pesan.
System.out.println("menerima: msgId=" + properties.getMessageId());
// Konsumen harus melakukan commit acknowledgment (ack) dalam periode validitas. Jika tidak, pesan akan dikirim ulang. Sebuah pesan dapat dikirim ulang hingga 16 kali.
// Jika pengiriman pesan gagal setelah 16 kali percobaan, pesan tersebut akan dibuang atau dikirim ke dead-letter exchange.
// Periode validitas adalah 1 menit untuk instans Edisi Profesional, 5 menit untuk instans Edisi Enterprise dan Serverless, serta 30 menit untuk instans Edisi Platinum.
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.getConnection().close();
} catch (IOException e) {
System.out.println("kesalahan menutup koneksi." + e);
}
latch.countDown();
}));
latch.await();
}
}Deskripsi parameter SDK
Parameter | Contoh | Deskripsi |
hostName | XXX.net.mq.amqp.aliyuncs.com | Titik akhir instans ApsaraMQ for RabbitMQ. Untuk informasi selengkapnya tentang cara mendapatkan titik akhir, lihat Dapatkan titik akhir instans. |
Port | 5672 | Port default. Gunakan port 5672 untuk koneksi tanpa enkripsi dan port 5671 untuk koneksi terenkripsi. |
userName | MjoxODgwNzcwODY5MD**** | Nama pengguna statis yang digunakan untuk autentikasi izin saat Anda menghubungkan klien ke broker ApsaraMQ for RabbitMQ. Anda harus membuat nama pengguna statis di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Buat nama pengguna dan kata sandi. |
passWord | NDAxREVDQzI2MjA0OT**** | Kata sandi statis yang digunakan untuk autentikasi izin saat Anda menghubungkan klien ke broker ApsaraMQ for RabbitMQ. Anda harus membuat kata sandi statis di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Buat nama pengguna dan kata sandi. |
virtualHost | amqp_vhost | vhost yang Anda buat di instans ApsaraMQ for RabbitMQ. Anda harus membuat vhost di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya. |
exchangeName | ExchangeTest | Exchange yang Anda buat di instans ApsaraMQ for RabbitMQ. Anda harus membuat exchange di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya. |
queueName | QueueTest | Antrian yang Anda buat di instans ApsaraMQ for RabbitMQ. Anda harus membuat antrian di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya. |
routingKey | RoutingKeyTest | Kunci routing yang digunakan untuk mengikat exchange ke antrian di ApsaraMQ for RabbitMQ. Anda harus membuat binding di Konsol ApsaraMQ for RabbitMQ terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 2: Buat sumber daya. |
exchangeType | topic | Jenis exchange. ApsaraMQ for RabbitMQ mendukung jenis-jenis exchange berikut. Untuk informasi selengkapnya, lihat Exchange.
Penting Pastikan jenis exchange yang Anda tentukan sama dengan jenis exchange yang Anda pilih saat membuat exchange. |
Referensi
ApsaraMQ for RabbitMQ sepenuhnya kompatibel dengan RabbitMQ open source dan mendukung SDK untuk berbagai bahasa pemrograman. Untuk informasi selengkapnya mengenai SDK untuk bahasa pemrograman lain, lihat SDK RabbitMQ open source untuk berbagai bahasa pemrograman dan framework yang mendukung AMQP. Untuk informasi selengkapnya mengenai parameter lainnya, lihat dokumentasi klien RabbitMQ open source.
Jika terjadi kesalahan saat klien berjalan, lihat Kode kesalahan untuk mengidentifikasi penyebab dan menemukan solusinya.
Anda dapat mencari pesan atau jejaknya di Konsol ApsaraMQ for RabbitMQ untuk memverifikasi status pengiriman dan penerimaan pesan. Untuk informasi selengkapnya, lihat Cari pesan dan Jejak pesan.