Jika konsumsi pesan duplikat memengaruhi logika bisnis Anda, pastikan pesan tersebut bersifat idempoten. Topik ini menjelaskan konsep idempotensi pesan, skenario yang memerlukannya, serta metode penerapannya.
Apa itu idempotensi pesan
Dalam matematika dan ilmu komputer, operasi idempoten adalah operasi yang menghasilkan hasil yang sama, baik dijalankan satu kali maupun beberapa kali. Dalam sistem messaging, idempotensi berarti bahwa meskipun sebuah consumer mengonsumsi pesan secara berulang, hasil akhirnya tetap sama seperti saat mengonsumsinya hanya sekali, tanpa menimbulkan dampak negatif pada sistem bisnis.
Sebagai contoh, dalam skenario pembayaran, konsumen memproses Paket untuk memotong pembayaran suatu Pesanan sebesar 100 USD. Jika Paket potongan dikirimkan berulang kali karena faktor seperti ketidakstabilan jaringan, konsumen dapat memproses Paket tersebut beberapa kali. Namun, hasil bisnis akhirnya tetap berupa pemotongan sebesar 100 USD hanya sekali. Riwayat transaksi pengguna untuk Pesanan tersebut hanya mencatat satu Catatan potongan, dan biaya tidak dikenakan lebih dari sekali. Dalam kasus ini, operasi pemotongan berjalan sesuai harapan, dan seluruh prosedur pemrosesan Paket bersifat idempoten.
Skenario
Dalam aplikasi Internet, Anda mungkin menerima pesan duplikat dari ApsaraMQ for RabbitMQ, terutama saat jaringan tidak stabil. Jika duplikasi ini memengaruhi logika bisnis Anda, pastikan pesan tersebut bersifat idempoten. Pesan dapat menjadi duplikat karena alasan berikut:
-
Pesan duplikat saat pengiriman
Setelah pesan dikirim ke server dan dipersistensikan, koneksi sementara terputus atau client mengalami gangguan, sehingga server tidak dapat mengirimkan acknowledgement ke produsen. Jika produsen mengasumsikan pengiriman gagal dan mengirim ulang pesan tersebut, consumer akan menerima dua pesan identik dengan konten dan message ID yang sama.
-
Pesan duplikat saat pengiriman ke consumer
Sebuah pesan dikirimkan ke consumer dan logika bisnis diproses. Namun, koneksi sementara terputus saat client mengirimkan acknowledgement ke server. Untuk memastikan pesan dikonsumsi setidaknya sekali, server ApsaraMQ for RabbitMQ mengirim ulang pesan tersebut setelah jaringan pulih. Akibatnya, consumer menerima pesan duplikat dengan konten dan message ID yang sama.
-
Pesan duplikat saat rebalancing
Ketika server atau client ApsaraMQ for RabbitMQ melakukan restart, scale out, atau scale in, proses rebalancing dipicu. Proses ini dapat menyebabkan consumer menerima pesan duplikat. Rebalancing juga dapat terjadi akibat event seperti fluktuasi jaringan, restart di sisi server, atau restart aplikasi consumer.
Metode
Ikuti langkah-langkah berikut untuk menerapkan idempotensi pesan menggunakan message ID sebagai kunci idempotensi:
-
Buat tabel di database Anda yang menggunakan message ID unik sebagai kunci unik.
-
Tetapkan message ID unik untuk setiap pesan di client produsen.
Kode contoh berikut menunjukkan cara menetapkan message ID unik:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build(); channel.basicPublish("${ExchangeName}", "RoutingKey", true, props, ("Message Body" + i).getBytes(StandardCharsets.UTF_8));Untuk informasi lebih lanjut tentang message ID, lihat Cara menetapkan message ID.
-
Di client consumer, proses pesan untuk idempotensi berdasarkan message ID uniknya.
Kode contoh berikut menunjukkan cara memproses pesan untuk idempotensi berdasarkan message ID uniknya:
channel.basicConsume(Producer.QueueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 1. Ambil data untuk indeks bisnis unik. try{ String messageId = properties.getMessageId(); // Message ID atau informasi lain yang dapat digunakan sebagai kunci unik. // 2. Mulai transaksi database. idempTable.insert(messageId); // 3. Proses logika bisnis untuk pesan yang diterima. // 4. Commit atau rollback transaksi. Kirim acknowledgement (ACK) hanya jika pemrosesan berhasil. channel.basicAck(envelope.getDeliveryTag(), false); } catch (DatabasePrimaryKeyConflictException e){ // Ini adalah pesan duplikat. Langsung kirim acknowledgement. channel.basicAck(envelope.getDeliveryTag(), false); } } } );