ApsaraMQ for RocketMQ menyediakan SDK dalam berbagai bahasa pemrograman untuk mengirim dan menerima pesan dari berbagai jenis. Topik ini menjelaskan cara menggunakan SDK Java untuk terhubung ke broker ApsaraMQ for RocketMQ guna mengirim dan menerima pesan normal.
Prasyarat
Sumber daya yang diperlukan telah dibuat di ApsaraMQ for RocketMQ. Untuk informasi lebih lanjut, lihat Langkah 2: Buat sumber daya.
IntelliJ IDEA telah diinstal. Untuk informasi lebih lanjut, lihat IntelliJ IDEA.
Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA Ultimate digunakan.
JDK 1.8 atau versi lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.
Maven 2.5 atau versi lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Mengunduh Apache Maven.
Instal pustaka dependensi Java
Buat proyek Java di IntelliJ IDEA.
Tambahkan dependensi berikut ke file pom.xml untuk mengimpor pustaka dependensi Java:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
Hasilkan pesan
Dalam proyek Java yang dibuat, buat program untuk mengirim pesan normal dan jalankan program tersebut. Contoh kode:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Nama topik tempat pesan dikirim. Sebelum Anda menggunakan topik untuk menerima pesan, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
/**
* Saat Anda menginisialisasi produser, Anda dapat menentukan topik yang ingin Anda gunakan untuk memeriksa apakah pengaturan topik valid dan mencegah topik tidak valid dimulai.
* Anda tidak perlu menentukan topik untuk pesan non-transaksional. Broker secara dinamis memeriksa apakah topik valid.
* Catatan: Untuk mencegah operasi API yang dipanggil untuk memeriksa pesan transaksional gagal, Anda harus menentukan topik untuk pesan transaksional sebelumnya.
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(clientConfiguration)
.build();
// Kirim pesan normal.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Kunci pesan. Anda dapat menggunakan kunci untuk mencari pesan.
.setKeys("kunciPesan")
// Tag pesan. Konsumen dapat menggunakan tag untuk memfilter pesan.
.setTag("tagPesan")
// Tubuh pesan.
.setBody("isiPesan".getBytes())
.build();
try {
// Kirim pesan. Perhatikan hasilnya dan tangkap pengecualian seperti kegagalan.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}Konsumsi pesan
Dalam proyek Java yang dibuat, buat program untuk berlangganan pesan normal dan jalankan program tersebut. ApsaraMQ for RocketMQ memungkinkan Anda mengonsumsi pesan dalam mode sederhana dan mode dorong. Untuk informasi lebih lanjut, lihat SimpleConsumer dan PushConsumer. Anda dapat memilih salah satu mode untuk berlangganan pesan. Tabel berikut menjelaskan perbedaan antara konsumen sederhana dan konsumen dorong.
Item | PushConsumer | SimpleConsumer |
Pemanggilan operasi API | Operasi callback dipanggil untuk mengembalikan hasil konsumsi menggunakan pendengar pesan. Konsumen hanya dapat memproses logika konsumsi dalam ruang lingkup pendengar pesan. | Aplikasi bisnis mengimplementasikan pemrosesan pesan dan memanggil operasi yang sesuai untuk mengembalikan hasil konsumsi. |
Manajemen konkurensi konsumsi | SDK ApsaraMQ for RocketMQ digunakan untuk mengelola jumlah thread konkuren untuk konsumsi pesan. | Jumlah thread konkuren yang digunakan untuk konsumsi pesan didasarkan pada logika konsumsi aplikasi bisnis individu. |
Fleksibilitas API | Operasi API dienkapsulasi dan memberikan fleksibilitas yang buruk. | Operasi atomik memberikan fleksibilitas yang besar. |
Skenario | Tipe konsumen ini cocok untuk skenario pengembangan yang tidak memerlukan proses kustom. | Tipe konsumen ini cocok untuk skenario pengembangan yang memerlukan proses kustom. |
PushConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
// Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String consumerGroup = "GrupKonsumen Anda";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
// Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan dalam topik dilanggan.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Inisialisasi konsumen dorong. Saat Anda menginisialisasi konsumen dorong, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Grup konsumen.
.setConsumerGroup(consumerGroup)
// Langganan.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Pendengar pesan.
.setMessageListener(messageView -> {
// Konsumsi pesan dan kembalikan hasil konsumsi.
// LOGGER.info("Consume message={}", messageView);
System.out.println("Konsumsi Pesan: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// Jika Anda tidak lagi memerlukan konsumen dorong, matikan prosesnya.
//pushConsumer.close();
}
} SimpleConsumer
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
* Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String topic = "Topik Anda";
// Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
String consumerGroup = "GrupKonsumen Anda";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
/**
* Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
*/
//.setNamespace("InstanceId")
/**
* Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
* Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
* Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
*/
//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
.build();
Duration awaitDuration = Duration.ofSeconds(10);
// Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan dalam topik dilanggan.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Inisialisasi konsumen sederhana. Saat Anda menginisialisasi konsumen sederhana, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Grup konsumen.
.setConsumerGroup(consumerGroup)
// Durasi timeout untuk permintaan polling panjang.
.setAwaitDuration(awaitDuration)
// Langganan.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Jumlah maksimum pesan yang akan ditarik.
int maxMessageNum = 16;
// Waktu tak terlihat pesan.
Duration invisibleDuration = Duration.ofSeconds(10);
// Jika Anda menggunakan konsumen sederhana untuk mengonsumsi pesan, klien harus mendapatkan dan mengonsumsi pesan dalam loop.
// Untuk mengonsumsi pesan secara real-time, kami sarankan Anda menggunakan beberapa thread untuk menarik pesan secara konkuren.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Pesan diterima: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// Setelah konsumsi selesai, konsumen harus memanggil metode ACK untuk mengirimkan hasil konsumsi ke broker.
consumer.ack(message);
System.out.println("Pesan diakui berhasil, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// Jika Anda tidak lagi memerlukan konsumen sederhana, matikan prosesnya.
// consumer.close();
}
} Deskripsi versi untuk mengakses instance serverless melalui Internet
Hanya SDK versi tertentu yang dapat mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet.
SDK untuk Java 5.x
Jika Anda mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet untuk mengirim dan menerima pesan, pastikan versi SDK untuk Java memenuhi kondisi berikut dan tambahkan informasi berikut dalam kode:
Ganti InstanceId dengan ID instance ApsaraMQ for RocketMQ Anda.
rocketmq-client: versi 5.2.0 atau lebih baru
Tambahkan informasi berikut dalam kode saat Anda mengirim pesan:
producer.setNamespaceV2("InstanceId");Tambahkan informasi berikut dalam kode saat Anda menerima pesan:
consumer.setNamespaceV2("InstanceId");rocketmq-client-java: versi 5.0.6 atau lebih baru
Tambahkan informasi berikut dalam kode saat Anda mengirim dan menerima pesan:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();
SDK klien TCP untuk Java 1.x
Jika Anda mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet untuk mengirim dan menerima pesan, pastikan bahwa versi RocketMQ 1.x TCP client SDK untuk Java adalah 1.9.0.Final atau lebih baru dan tambahkan informasi berikut dalam kode:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
Ganti InstanceId dengan ID instance ApsaraMQ for RocketMQ Anda.
Parameter SDK
Parameter | Contoh | Deskripsi |
endpoints | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | Titik akhir instance ApsaraMQ for RocketMQ. Untuk informasi tentang cara memperoleh titik akhir, lihat Dapatkan titik akhir instance.
|
InstanceId | rmq-cn-xxx | ID instance ApsaraMQ for RocketMQ. |
topic | normal_test | Topik tempat pesan dikirim atau dari mana pesan dikonsumsi dalam instance ApsaraMQ for RocketMQ. Anda harus membuat topik tersebut pada instance ApsaraMQ for RocketMQ terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat topik. |
group | GID_test | Grup konsumen yang digunakan oleh konsumen untuk mengonsumsi pesan dalam instance ApsaraMQ for RocketMQ. Anda harus membuat grup konsumen tersebut pada instance ApsaraMQ for RocketMQ terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat grup konsumen. |
Instance UserName | 1XVg0hzgKm****** | Nama pengguna instance ApsaraMQ for RocketMQ. Jika Anda mengakses instance melalui Internet, Anda harus menentukan nama pengguna. Jika Anda mengakses instance dalam VPC, Anda hanya perlu menentukan nama pengguna jika instance adalah instance serverless dan fitur tanpa otentikasi di VPC dinonaktifkan untuk instance tersebut. Untuk informasi tentang cara memperoleh nama pengguna, lihat Dapatkan nama pengguna dan kata sandi instance. |
Instance Password | ijSt8rEc45****** | Kata sandi untuk instance ApsaraMQ for RocketMQ. Jika Anda mengakses instance melalui Internet, Anda harus menentukan kata sandi. Jika Anda mengakses instance di dalam VPC, Anda hanya perlu menentukan kata sandi jika instance tersebut merupakan instance tanpa server dan fitur autentikasi gratis di VPC dinonaktifkan untuk instance tersebut. Untuk informasi tentang cara mendapatkan kata sandi, lihat Memperoleh nama pengguna dan kata sandi sebuah instance. |
Verifikasi konsumsi pesan
Setelah Anda mengonsumsi pesan, Anda dapat memeriksa status konsumsi pesan tersebut di konsol ApsaraMQ for RocketMQ.
Masuk ke konsol ApsaraMQ for RocketMQ. Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sisi kiri halaman yang muncul, klik Message Tracing.
Referensi SDK
Untuk informasi tentang cara menggunakan SDK untuk bahasa pemrograman lainnya untuk mengirim dan menerima pesan dari jenis lain, lihat Ikhtisar.