Topik ini menyajikan contoh kode untuk mengirim dan menerima pesan terjadwal serta pesan tertunda menggunakan SDK Klien HTTP untuk Java.
Informasi latar belakang
Pesan tertunda adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ ke konsumen setelah periode waktu tertentu.
Pesan terjadwal adalah pesan yang dikirim oleh broker ApsaraMQ for RocketMQ ke konsumen pada titik waktu tertentu.
Konfigurasi kode untuk pesan terjadwal sama dengan konfigurasi untuk pesan tertunda melalui HTTP. Kedua jenis pesan tersebut dikirim ke konsumen setelah periode waktu tertentu berdasarkan atribut pesan.
Untuk informasi lebih lanjut, lihat Pesan Terjadwal dan Pesan Tertunda.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
Instal SDK untuk Java. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Kirim pesan terjadwal dan pesan tertunda
Contoh kode berikut menunjukkan cara mengirim pesan terjadwal dan pesan tertunda menggunakan SDK Klien HTTP untuk Java:
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk autentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
// Setiap topik dapat digunakan untuk mengirim dan menerima pesan dari satu jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
final String topic = "${TOPIC}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
final String instanceId = "${INSTANCE_ID}";
// Dapatkan produsen yang mengirim pesan ke topik.
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// Kirim empat pesan secara siklik.
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
pubMsg = new TopicMessage(
// Konten pesan.
"hello mq!".getBytes(),
// Tag pesan.
"A"
);
// Atribut kustom pesan.
pubMsg.getProperties().put("a", String.valueOf(i));
// Kunci pesan.
pubMsg.setMessageKey("MessageKey");
// Periode waktu setelah mana broker mengirimkan pesan ke konsumen. Dalam contoh ini, broker mengirimkan pesan ke konsumen setelah penundaan 10 detik. Atur parameter ini ke timestamp dalam milidetik.
// Jika produsen mengirim pesan terjadwal, atur parameter ini ke selisih waktu antara titik waktu terjadwal dan titik waktu saat ini.
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
// Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + topic + ", msgId adalah: " + pubResultMsg.getMessageId()
+ ", bodyMD5 adalah: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pengiriman pesan gagal dan perlu dikirim ulang.
System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}Berlangganan pesan terjadwal dan pesan tertunda
Contoh kode berikut menunjukkan cara berlangganan pesan terjadwal atau pesan tertunda menggunakan SDK Klien HTTP untuk Java:
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
"${HTTP_ENDPOINT}",
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
// Rahasia AccessKey yang digunakan untuk autentikasi.
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
);
// Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
// Setiap topik dapat digunakan untuk mengirim dan menerima pesan dari jenis tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
final String topic = "${TOPIC}";
// ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
final String groupId = "${GROUP_ID}";
// ID instance tempat topik tersebut berada. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
// Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
do {
List<Message> messages = null;
try {
// Konsumsi pesan dalam mode polling panjang.
// Dalam mode polling panjang, jika tidak ada pesan di topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan di broker selama periode waktu yang ditentukan. Jika pesan menjadi tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirim respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
messages = consumer.consumeMessage(
3,// Tentukan jumlah maksimum pesan yang dapat dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
3// Durasi siklus polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// Tidak ada pesan di topik yang tersedia untuk dikonsumsi.
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": tidak ada pesan baru, lanjutkan!");
continue;
}
// Logika konsumsi pesan.
for (Message message : messages) {
System.out.println("Terima pesan: " + message);
}
// Jika broker tidak menerima pengakuan (ACK) untuk pesan dari konsumen sebelum interval pengulangan pengiriman berakhir, broker mengirim pesan untuk dikonsumsi lagi.
// Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// Jika handle pesan habis waktu, broker tidak dapat menerima ACK untuk pesan dari konsumen.
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Pengakuan pesan gagal, requestId adalah:" + errors.getRequestId() + ", handle gagal:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", KodeKesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", PesanKesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}