All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim dan terima pesan terurut

Last Updated:Jul 02, 2025

Pesan terurut adalah jenis pesan yang disediakan oleh ApsaraMQ for RocketMQ. Pesan ini diterbitkan dan dikonsumsi dalam urutan first-in-first-out (FIFO) yang ketat. Topik ini memberikan contoh kode untuk mengirim dan menerima pesan terurut menggunakan HTTP client SDK untuk Java.

Informasi latar belakang

Pesan terurut dibagi menjadi jenis-jenis berikut:

  • Pesan terurut global: Jika pesan dalam topik adalah jenis ini, pesan tersebut diterbitkan dan dikonsumsi dalam urutan FIFO.

  • Pesan terurut berpartisi: Jika pesan dalam topik adalah jenis ini, pesan tersebut didistribusikan ke partisi yang berbeda menggunakan kunci sharding. Pesan dalam setiap partisi dikonsumsi dalam urutan FIFO. Kunci sharding adalah bidang kunci yang digunakan untuk pesan terurut guna mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.

Untuk informasi lebih lanjut, lihat Pesan Terurut.

Prasyarat

Sebelum memulai, pastikan operasi berikut telah dilakukan:

  • Instal SDK untuk Java. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Buat sumber daya yang ingin Anda tentukan dalam kode di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Peroleh pasangan AccessKey dari akun Alibaba Cloud Anda. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

Kirim pesan terurut

Penting

Broker ApsaraMQ for RocketMQ menentukan urutan pembuatan pesan berdasarkan urutan pengiriman menggunakan satu produser atau thread untuk mengirim pesan. Jika pengirim menggunakan beberapa produser atau thread untuk mengirim pesan secara bersamaan, urutan pesan ditentukan oleh urutan penerimaan pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.

Kode sampel berikut memberikan contoh cara mengirim pesan terurut menggunakan HTTP client SDK untuk Java:

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class OrderProducer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
                "${HTTP_ENDPOINT}",
                // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
                // ID AccessKey yang digunakan untuk otentikasi.
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	              // Rahasia AccessKey yang digunakan untuk otentikasi.
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
        // Setiap topik dapat digunakan untuk mengirim dan menerima pesan dari tipe tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan dari tipe lain.
        final String topic = "${TOPIC}";
        // ID instance tempat topik tersebut milik. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        final String instanceId = "${INSTANCE_ID}";

        // Dapatkan produser yang mengirim pesan ke topik.
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // Kirim delapan pesan secara siklik.
            for (int i = 0; i < 8; i++) {
                TopicMessage pubMsg = new TopicMessage(
                        // Konten pesan.
                        "hello mq!".getBytes(),
                        // Tag pesan.
                        "A"
                );
                // Kunci sharding yang digunakan untuk mendistribusikan pesan terurut ke partisi tertentu. Kunci sharding dapat digunakan untuk mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan.
                pubMsg.setShardingKey(String.valueOf(i % 2));
                pubMsg.getProperties().put("a", String.valueOf(i));
                // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
                System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + topic + ", msgId adalah: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 adalah: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim lagi.
            System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}

Langganan pesan terurut

Kode sampel berikut memberikan contoh cara mengonsumsi pesan terurut menggunakan HTTP client SDK untuk Java:

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;

import java.util.ArrayList;
import java.util.List;

public class OrderConsumer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // Titik akhir HTTP. Anda dapat memperoleh titik akhir di bagian HTTP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
                "${HTTP_ENDPOINT}",
                // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
                // ID AccessKey yang digunakan untuk otentikasi.
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
	              // Rahasia AccessKey yang digunakan untuk otentikasi.
	              System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        );

        // Topik tempat pesan diproduksi. Anda harus membuat topik di konsol ApsaraMQ for RocketMQ.
        // Setiap topik dapat digunakan untuk mengirim dan menerima pesan dari tipe tertentu. Misalnya, topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan dari tipe lain.
        final String topic = "${TOPIC}";
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        final String groupId = "${GROUP_ID}";
        // ID instance tempat topik tersebut milik. Anda harus membuat instance di konsol ApsaraMQ for RocketMQ.
        // Jika instance memiliki namespace, tentukan ID instance. Jika instance tidak memiliki namespace, atur parameter instanceID ke null atau string kosong. Anda dapat memperoleh namespace instance di halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        final String instanceId = "${INSTANCE_ID}";

        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // Konsumsi pesan secara siklik di thread saat ini. Kami merekomendasikan Anda menggunakan beberapa thread untuk mengonsumsi pesan secara bersamaan.
        do {
            List<Message> messages = null;

            try {
                // Konsumsi pesan dalam mode polling panjang. Konsumen mungkin menarik pesan terurut berpartisi dari beberapa partisi. Konsumen mengonsumsi pesan dari partisi yang sama dalam urutan pengiriman pesan.
                // Anggaplah konsumen menarik pesan terurut berpartisi dari satu partisi. Jika broker gagal menerima Pengakuan (ACK) untuk pesan dari konsumen, broker akan mengirimkan pesan kepada konsumen lagi.
                // Konsumen hanya dapat mengonsumsi batch pesan berikutnya dari partisi setelah semua pesan yang ditarik dari partisi dalam batch sebelumnya diakui sebagai dikonsumsi.
                // Dalam mode polling panjang, jika tidak ada pesan dalam topik yang tersedia untuk dikonsumsi, permintaan ditangguhkan pada broker selama periode waktu yang ditentukan. Jika pesan tersedia untuk dikonsumsi dalam periode waktu yang ditentukan, broker segera mengirimkan respons ke konsumen. Dalam contoh ini, nilainya ditentukan sebagai 3 detik.
                messages = consumer.consumeMessageOrderly(
                        3,  // Jumlah maksimum pesan yang dapat dikonsumsi pada satu waktu. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 16.
                        3 // Durasi periode polling panjang. Unit: detik. Dalam contoh ini, nilainya ditentukan sebagai 3. Nilai maksimum yang dapat Anda tentukan adalah 30.
                );
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // Tidak ada pesan dalam topik yang tersedia untuk dikonsumsi.
            if (messages == null || messages.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + ": tidak ada pesan baru, lanjutkan!");
                continue;
            }

            // Logika konsumsi pesan.
            System.out.println("Terima " + messages.size() + " pesan:");
            for (Message message : messages) {
                System.out.println(message);
                System.out.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
            }

            // Jika broker gagal menerima ACK untuk pesan dari konsumen sebelum periode timeout untuk pengulangan pesan berakhir, broker akan mengirimkan pesan kepada konsumen lagi.
            // Timestamp unik ditentukan untuk handle pesan setiap kali pesan dikonsumsi.
            {
                List<String> handles = new ArrayList<String>();
                for (Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    // Jika handle pesan habis waktu, broker tidak dapat menerima ACK untuk pesan dari konsumen.
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Pengakuan pesan gagal, requestId adalah:" + errors.getRequestId() + ", handle gagal:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle :errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle + ", KodeKesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", PesanKesalahan:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                        continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }
}