All Products
Search
Document Center

ApsaraMQ for RocketMQ:Kirim pesan normal dalam salah satu dari tiga mode transmisi

Last Updated:Jul 02, 2025

ApsaraMQ for RocketMQ memungkinkan Anda mengirim pesan normal dalam mode sinkron, asinkron, dan satu arah. Topik ini menjelaskan prinsip dan skenario ketiga mode transmisi tersebut serta menyediakan contoh kode. Topik ini juga membandingkan ketiga mode transmisi.

Prasyarat

  • SDK untuk Java telah diinstal. Untuk informasi lebih lanjut, lihat Persiapkan Lingkungan.

  • Sumber daya yang ingin Anda tentukan dalam kode telah dibuat di konsol ApsaraMQ for RocketMQ. Sumber daya tersebut mencakup instance, topik, dan grup konsumen. Untuk informasi lebih lanjut, lihat Buat Sumber Daya.

  • Pasangan AccessKey akun Alibaba Cloud Anda telah diperoleh. Untuk informasi lebih lanjut, lihat Buat Pasangan AccessKey.

  • Opsional. Pengaturan logging telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Pengaturan Logging.

Transmisi Sinkron

  • Cara Kerja Transmisi Sinkron

    Dalam mode transmisi sinkron, pengirim hanya mengirim pesan setelah menerima respons untuk pesan sebelumnya dari broker ApsaraMQ for RocketMQ.同步发送

  • Skema Penggunaan

    Mode ini cocok digunakan dalam skenario seperti pengiriman notifikasi penting ke email, pesan pendaftaran, atau pesan promosi.

  • Contoh Kode:

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
                // ID AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // Rahasia AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                // Periode timeout untuk mengirim pesan. Unit: milidetik.
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // Titik akhir TCP. Anda bisa mendapatkan titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
                Producer producer = ONSFactory.createProducer(properties);
                // Sebelum Anda mengirim pesan, panggil metode start() sekali saja untuk memulai produser.
                producer.start();
    
                // Kirim pesan secara siklik.
                for (int i = 0; i < 100; i++){
                    Message msg = new Message( 
                        // Topik tempat pesan normal diproduksi. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                        "TopicTestMQ",
                        // Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan oleh konsumen untuk menyortir dan menyaring pesan di broker ApsaraMQ for RocketMQ.
                        // Untuk informasi tentang format dan konfigurasi tag, lihat praktik terbaik topik dan tag.
                        "TagA",
                        // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan.
                        // Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan.
                        "Hello MQ".getBytes());
                    // Kunci pesan. Kunci adalah atribut spesifik bisnis dari sebuah pesan dan harus unik secara global jika memungkinkan.
                    // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ dan mengirim ulang pesan tersebut.
                    // Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
                    msg.setKey("ORDERID_" + i);
    
                    try {
                        SendResult sendResult = producer.send(msg);
                        // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan berhasil dikirim.
                        if (sendResult != null) {
                            System.out.println(new Date() + " Kirim pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
                        }
                    }
                    catch (Exception e) {
                        // Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                        System.out.println(new Date() + " Kirim pesan mq gagal. Topik adalah:" + msg.getTopic());
                        e.printStackTrace();
                    }
                }
    
                // Sebelum keluar dari aplikasi, hancurkan produser.
                // Catatan: Jika Anda menghancurkan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan hancurkan produser.
                producer.shutdown();
            }
        }               

Transmisi Asinkron

  • Cara Kerja Transmisi Asinkron

    Dalam mode transmisi asinkron, pengirim mengirim pesan tanpa menunggu respons untuk pesan sebelumnya dari broker ApsaraMQ for RocketMQ. Jika menggunakan mode ini di ApsaraMQ for RocketMQ, Anda harus menulis logika implementasi operasi SendCallback. Pengirim langsung mengirim pesan berikutnya tanpa menunggu respons dari broker. Pengirim memanggil operasi SendCallback untuk menerima dan memproses respons dari broker.

    异步发送

  • Skema Penggunaan

    Mode ini digunakan dalam proses yang memerlukan waktu lama pada skenario bisnis sensitif terhadap waktu respons. Contohnya, setelah mengunggah video, callback digunakan untuk memicu transkoding. Setelah video ditranskode, hasil transkoding didorong melalui callback.

  • Contoh Kode:

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.OnExceptionContext;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendCallback;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
        import java.util.concurrent.TimeUnit;
    
        public class ProducerTest {
            public static void main(String[] args) throws InterruptedException {
                Properties properties = new Properties();
                // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
                // ID AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // Rahasia AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                // Periode timeout untuk mengirim pesan. Unit: milidetik.
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // Titik akhir TCP. Anda bisa mendapatkan titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Sebelum Anda mengirim pesan, panggil metode start() sekali saja untuk memulai produser.
                producer.start();
    
                Message msg = new Message(
                        // Topik tempat pesan normal diproduksi. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                        "TopicTestMQ",
                        // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk menyaring pesan di broker ApsaraMQ for RocketMQ.
                        "TagA",
                        // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan.
                        "Hello MQ".getBytes());
    
                // Kunci pesan. Kunci adalah atribut spesifik bisnis dari sebuah pesan dan harus unik secara global jika memungkinkan. Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ dan mengirim ulang pesan tersebut.
                // Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
                msg.setKey("ORDERID_100");
    
                // Kirim pesan dalam mode asinkron. Hasilnya dikembalikan ke produser setelah produser memanggil operasi SendCallback.
                producer.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        // Pesan berhasil dikirim.
                        System.out.println("Kirim pesan berhasil. topik=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        // Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                        System.out.println("Kirim pesan gagal. topik=" + context.getTopic() + ", msgId=" + context.getMessageId());
                    }
                });
    
                // Blokir thread saat ini selama 3 detik dan tunggu hasilnya kembali.
                TimeUnit.SECONDS.sleep(3);
    
                // Sebelum keluar dari aplikasi, hancurkan produser.
                // Catatan: Jika Anda menghancurkan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan hancurkan produser.
                producer.shutdown();
            }
        }                

Transmisi Satu Arah

  • Cara Kerja Transmisi Satu Arah

    Dalam mode transmisi satu arah, produser hanya mengirim pesan tanpa menunggu respons dari broker ApsaraMQ for RocketMQ atau memicu fungsi callback. Dalam mode ini, pesan dapat dikirim dalam mikrodetik.

    单向发送

  • Skema Penggunaan

    Mode ini cocok untuk skenario di mana pesan ditransmisikan dalam waktu singkat dengan persyaratan keandalan rendah, seperti pengumpulan log.

  • Contoh Kode:

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi.
                // ID AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // Rahasia AccessKey yang digunakan untuk autentikasi.
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                // Periode timeout untuk mengirim pesan. Unit: milidetik.
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // Titik akhir TCP. Anda bisa mendapatkan titik akhir di bagian TCP Endpoint halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
                properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // Sebelum Anda mengirim pesan, panggil metode start() sekali saja untuk memulai produser.
                producer.start();
                // Kirim pesan secara siklik.
                for (int i = 0; i < 100; i++){
                    Message msg = new Message(
                            // Topik tempat pesan normal diproduksi. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                            "TopicTestMQ",
                            // Tag Pesan,
                            // Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan oleh konsumen untuk menyortir dan menyaring pesan di broker ApsaraMQ for RocketMQ.
                            "TagA",
                            // Tubuh Pesan
                            // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus menyetujui metode serialisasi dan deserialisasi.
                            "Hello MQ".getBytes());
    
                    // Kunci pesan. Kunci adalah atribut spesifik bisnis dari sebuah pesan dan harus unik secara global jika memungkinkan.
                    // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk menanyakan pesan di konsol ApsaraMQ for RocketMQ dan mengirim ulang pesan tersebut.
                    // Catatan: Anda dapat mengirim dan menerima pesan bahkan jika Anda tidak menentukan kunci.
                    msg.setKey("ORDERID_" + i);
    
                    // Dalam mode transmisi satu arah, produser tidak menunggu respons dari broker ApsaraMQ for RocketMQ. Oleh karena itu, kehilangan data terjadi jika pesan yang gagal dikirim tidak dicoba ulang. Jika kehilangan data tidak dapat diterima, kami sarankan Anda menggunakan mode transmisi sinkron atau asinkron yang andal.
                    producer.sendOneway(msg);
                }
    
                // Sebelum keluar dari aplikasi, hancurkan produser.
                // Catatan: Jika Anda menghancurkan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan hancurkan produser.
                producer.shutdown();
            }
        }

Perbandingan antara ketiga mode transmisi

Mode transmisi

TPS

Respons

Keandalan

Sinkron

Tinggi

Ya

Tidak ada kehilangan pesan

Asinkron

Tinggi

Ya

Tidak ada kehilangan pesan

Satu arah

Tertinggi

Tidak

Mungkin ada kehilangan pesan

Berlangganan pesan normal

Untuk informasi tentang contoh kode berlangganan pesan normal, lihat Berlangganan Pesan.