Topik ini menyediakan contoh kode untuk mengirim dan menerima pesan tertunda menggunakan SDK klien TCP untuk Java.
Prasyarat
Sebelum memulai, pastikan langkah-langkah berikut telah dilakukan:
SDK untuk Java telah diinstal. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.
Sumber daya yang ingin Anda tentukan dalam kode telah dibuat di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.
Pasangan AccessKey dari akun Alibaba Cloud Anda telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.
Opsional. Pengaturan logging telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Pengaturan Logging.
Informasi latar belakang
Pesan tertunda dikirim oleh broker ApsaraMQ for RocketMQ ke klien untuk dikonsumsi setelah periode waktu tertentu, seperti 3 detik. Pesan tertunda mirip dengan antrian tertunda dan dapat digunakan dalam skenario yang memerlukan jendela waktu antara produksi dan konsumsi pesan, atau skenario di mana tugas tertunda dipicu oleh pesan.
Untuk informasi tentang istilah yang digunakan untuk pesan tertunda dan tindakan pencegahan yang harus diambil saat menggunakannya, lihat Pesan Terjadwal dan Pesan Tertunda.
Jika Anda adalah pengguna baru ApsaraMQ for RocketMQ, kami sarankan merujuk pada Proyek Demo untuk membuat proyek sebelum menggunakan ApsaraMQ for RocketMQ untuk mengirim dan menerima pesan.
Kirim pesan tertunda
Untuk informasi tentang contoh kode rinci, lihat ApsaraMQ for RocketMQ Repositori Kode.
Contoh kode berikut menunjukkan cara mengirim pesan tertunda menggunakan SDK klien TCP untuk Java:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
// ID AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// Rahasia AccessKey yang digunakan untuk autentikasi.
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Titik akhir TCP. Anda bisa mendapatkan titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// Sebelum mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
producer.start();
Message msg = new Message(
// Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
"Topic",
// Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
"tag",
// Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode untuk meng-serialisasi dan men-deserialize tubuh pesan.
"Hello MQ".getBytes());
// Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
// Jika Anda tidak dapat menerima pesan sesuai harapan, Anda dapat menggunakan kunci untuk menanyakan dan mengirim ulang pesan di konsol ApsaraMQ for RocketMQ.
// Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
msg.setKey("ORDERID_100");
try {
// Waktu tunda sebelum pesan dikirim. Nilainya harus lebih dari waktu saat ini. Unit: milidetik. Nilai maksimum yang dapat Anda tentukan sama dengan 40 hari.
// Dalam contoh berikut, pesan dikirim setelah penundaan 3 detik.
long delayTime = System.currentTimeMillis() + 3000;
// Titik waktu ketika broker mulai mengirim pesan.
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
if (sendResult != null) {
System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
}
} catch (Exception e) {
// Logika yang ingin Anda gunakan untuk mengirim ulang atau mempertahankan pesan jika pesan gagal dikirim dan perlu dikirim lagi.
System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + msg.getTopic());
e.printStackTrace();
}
// Sebelum keluar dari aplikasi, hancurkan produser.
// Catatan: Jika Anda menghancurkan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan hancurkan produser.
producer.shutdown();
}
} Berlangganan pesan tertunda
Contoh kode untuk berlangganan pesan tertunda sama dengan berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Berlangganan Pesan.