全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Kode contoh

更新时间:Jul 02, 2025

ApsaraMQ for RocketMQ Instance 5.x kompatibel dengan klien yang menggunakan RocketMQ 1.x TCP client SDK for Java. Anda dapat menggunakan SDK ini untuk terhubung ke instance ApsaraMQ for RocketMQ 5.x guna mengirim dan menerima pesan. Topik ini menyediakan kode contoh untuk mengirim dan menerima pesan menggunakan RocketMQ 1.x TCP client SDK for Java.

Penting
  • Disarankan untuk menggunakan RocketMQ 5.x SDK terbaru. SDK ini sepenuhnya kompatibel dengan ApsaraMQ for RocketMQ broker 5.x dan menawarkan lebih banyak fungsi serta fitur yang ditingkatkan. Untuk informasi lebih lanjut, lihat Deskripsi Versi.

  • Alibaba Cloud hanya memelihara RocketMQ 3.x, 4.x, dan TCP client SDK. Disarankan untuk menggunakannya hanya pada bisnis yang sudah ada.

Deskripsi versi untuk mengakses instance serverless melalui Internet

Jika Anda mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet untuk mengirim dan menerima pesan, pastikan bahwa versi RocketMQ 1.x TCP client SDK for Java adalah 1.9.0.Final atau yang lebih baru. Tambahkan informasi berikut dalam kode:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

Catatan

Ganti InstanceId dengan ID instance ApsaraMQ for RocketMQ Anda.

Kirim dan terima pesan normal

Kirim pesan normal dalam mode transmisi sinkron

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Date;
import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Periode timeout untuk mengirim pesan. Unit: milidetik.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();

        // Kirim pesan secara siklik.
        for (int i = 0; i < 100; i++){
            Message msg = new Message( 
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                // Topik tempat pesan normal termasuk. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                "TopicTestMQ",
                // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                // Untuk informasi tentang format tag dan cara menentukan tag, lihat Penyaringan pesan.
                "TagA",
                // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan.
                // Produser dan konsumen harus setuju pada metode yang digunakan untuk serialisasi dan deserialisasi tubuh pesan.
                "Hello MQ".getBytes());
            // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
            // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ.
            // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
            msg.setKey("ORDERID_" + i);

            try {
                SendResult sendResult = producer.send(msg);
                // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
                if (sendResult != null) {
                    System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }
}               

Kirim pesan normal dalam mode transmisi asinkron

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class ProducerTest {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Periode timeout untuk mengirim pesan. Unit: milidetik.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");

        Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();

        Message msg = new Message(
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                // Topik tempat pesan normal termasuk. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                "TopicTestMQ",
                // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                "TagA",
                // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                "Hello MQ".getBytes());

        // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan. Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ dan mengirim pesan lagi.
        // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
        msg.setKey("ORDERID_100");

        // Kirim pesan dalam mode transmisi asinkron. Hasilnya dikembalikan ke produser setelah produser memanggil operasi SendCallback.
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // Pesan dikirim.
                System.out.println("Pengiriman pesan berhasil. topik=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println("Pengiriman pesan gagal. topik=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });

        // Blokir thread saat ini selama 3 detik dan tunggu hasil asinkron kembali.
        TimeUnit.SECONDS.sleep(3);

        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }
}                

Kirim pesan normal dalam mode transmisi satu arah

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Periode timeout untuk mengirim pesan. Unit: milidetik.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");

        Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();
        // Kirim pesan secara siklik.
        for (int i = 0; i < 100; i++){
            Message msg = new Message(
                    // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                    // Topik tempat pesan normal termasuk. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                    "TopicTestMQ",
                    // Tag pesan,
                    // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                    "TagA",
                    // Tubuh pesan
                    // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                    "Hello MQ".getBytes());

            // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
            // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ.
            // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
            msg.setKey("ORDERID_" + i);

            // Dalam mode transmisi satu arah, produser tidak menunggu respons dari broker ApsaraMQ for RocketMQ. Oleh karena itu, kehilangan data terjadi jika pesan yang gagal dikirim tidak dicoba ulang. Jika kehilangan data tidak dapat diterima, kami merekomendasikan agar Anda menggunakan mode transmisi sinkron atau asinkron yang andal.
            producer.sendOneway(msg);
        }

        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }
}

Kirim pesan normal menggunakan beberapa utas

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // Inisialisasi konfigurasi produser.
        Properties properties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
         * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
         * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
         * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
         */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey, "NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Periode timeout untuk mengirim pesan. Unit: milidetik.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();

        // Produser dan konsumen yang dibuat bersifat aman utas dan dapat dibagikan di antara utas. Jangan buat instance produser atau instance konsumen untuk setiap utas.

        // Dua utas berbagi produser dan mengirim pesan secara bersamaan ke ApsaraMQ for RocketMQ.
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message(
                        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                        // Topik tempat pesan normal termasuk. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                        "TopicTestMQ",
                        // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                        "TagA",
                        // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan.
                        // Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                        "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
                    if (sendResult != null) {
                        System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                try {
                    SendResult sendResult = producer.send(msg);
                    // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
                    if (sendResult != null) {
                        System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // (Opsional) Jika Anda tidak lagi memerlukan instance produser, matikan produser dan lepaskan sumber daya yang dialokasikan.
        // producer.shutdown();
    }
}

Berlangganan pesan normal dalam mode dorong

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
         * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
         * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
         * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
         */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey, "NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        // Mode konsumsi klustering. Ini adalah mode default.
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // Mode konsumsi siaran.
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Berlangganan beberapa tag.
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Terima: " + message);
                return Action.CommitMessage;
            }
        });

        // Berlangganan topik lain. Untuk berhenti berlangganan dari topik, hapus kode untuk langganan dan mulai ulang konsumen.
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Berlangganan semua tag.
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Terima: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Konsumen Dimulai");
    }
} 

Berlangganan pesan normal secara batch dalam mode dorong

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        consumerProperties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        consumerProperties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.
        
        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");

        // Jumlah maksimum pesan yang dikonsumsi sekaligus. Dalam contoh ini, nilainya ditentukan sebagai 128. Jika jumlah pesan yang disimpan dalam topik yang ditentukan mencapai nilai ini, SDK segera memanggil metode callback. Dengan cara ini, konsumen dapat mengonsumsi pesan. Nilai valid: 1 hingga 1024. Nilai default: 32.
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // Waktu tunggu maksimum antara dua batch berturut-turut. Dalam contoh ini, nilainya ditentukan sebagai 10 detik. Jika waktu tunggu yang ditentukan tercapai, SDK segera memanggil metode callback. Dengan cara ini, konsumen dapat mengonsumsi pesan. Nilai valid: 0 hingga 450. Nilai default: 0. Unit: detik.
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Ukuran Batch: %d\n", messages.size());
           // Proses beberapa pesan sekaligus.
                return Action.CommitMessage;
            }
        });
        // Mulai konsumen batch.
        batchConsumer.start();
        System.out.println("Konsumen berhasil dimulai.");

        // Tunggu selama periode waktu tertentu untuk mencegah proses keluar.
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}  

Berlangganan pesan normal dalam mode tarik

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerClient {
    public static void main(String[] args){
        Properties properties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        PullConsumer consumer = ONSFactory.createPullConsumer(properties);
        // Mulai konsumen.
        consumer.start();
        // Kueri semua partisi dalam topic-xxx.
        Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
        // Partisi dari mana Anda ingin menarik pesan.
        consumer.assign(topicPartitions);

        while (true) {
            // Periode timeout untuk menarik pesan. Dalam contoh ini, nilainya ditentukan sebagai 3000 milidetik.
            List<Message> messages = consumer.poll(3000);
            System.out.printf("Pesan diterima: %s %n", messages);
        }
    }
}

Kirim dan terima pesan terurut

Kirim pesan terurut

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Date;
import java.util.Properties;

public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                    "Order_global_topic",
                    // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                    "TagA",
                    // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                    "send order global msg".getBytes()
            );
            // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
            // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ.
            // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
            msg.setKey(orderId);
            // Bidang kunci yang digunakan dalam pesan terurut untuk mengidentifikasi partisi. Kunci sharding berbeda dari kunci pesan normal.
            // Bidang ini dapat diatur ke string non-kosong untuk pesan terurut global.
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
                if (sendResult != null) {
                    System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }

}

Berlangganan pesan terurut

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // ID grup yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        // Waktu tunggu dalam milidetik sebelum percobaan ulang dilakukan pada pesan terurut jika pesan gagal dikonsumsi. Nilai valid: 10 hingga 30000.
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // Jumlah maksimum percobaan ulang yang dapat dilakukan pada pesan jika pesan gagal dikonsumsi.
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                "Order_global_topic",
                // Berlangganan pesan yang berisi tag yang ditentukan dalam topik yang ditentukan.
                // 1. Asterisk (*) menentukan bahwa konsumen berlangganan semua pesan.
                // 2. TagA || TagB || TagC menentukan bahwa konsumen berlangganan pesan yang berisi Tag A, Tag B, atau Tag C.
                "*",
                new MessageOrderListener() {
                    /**
                     * 1 Jika pesan gagal dikonsumsi atau terjadi pengecualian selama pemrosesan pesan, OrderAction.Suspend dikembalikan.
                     * 2. Jika pesan diproses, OrderAction.Success dikembalikan.
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}

Kirim dan terima pesan terjadwal

Kirim pesan terjadwal

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();
        Message msg = new Message(
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                "Topic",
                // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                "tag",
                // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                "Hello MQ".getBytes());
        // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
        // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari pesan di konsol ApsaraMQ for RocketMQ.
        // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
        msg.setKey("ORDERID_100");

        try {
            // Timestamp yang menunjukkan kapan broker ApsaraMQ for RocketMQ mengirimkan pesan ke konsumen. Unit: milidetik. Misalnya, jika Anda menyetel parameter ini ke 2016-03-07 16:21:00, broker mengirimkan pesan pada pukul 16:21:00 tanggal 7 Maret 2016. Nilai ini harus lebih dari waktu saat ini. Jika Anda menyetel parameter ini ke waktu yang lebih awal dari waktu saat ini, pesan segera dikirimkan ke konsumen.
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();

            msg.setStartDeliverTime(timeStamp);
            // Kirim pesan. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        }
        catch (Exception e) {
            // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
            System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
            e.printStackTrace();
        }

        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }
}       

Berlangganan pesan terjadwal

Kode contoh untuk berlangganan pesan terjadwal sama dengan kode contoh untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Kirim dan Terima Pesan Normal.

Kirim dan terima pesan tertunda

Kirim pesan tertunda

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

import java.util.Date;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");

        Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil metode start() hanya sekali untuk memulai produser.
        producer.start();
        Message msg = new Message( 
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                "Topic",
                // Tag pesan. Tag pesan mirip dengan tag Gmail dan dapat digunakan oleh konsumen untuk memfilter pesan di broker ApsaraMQ for RocketMQ.
                "tag",
                // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan. Produser dan konsumen harus setuju pada metode serialisasi dan deserialisasi.
                "Hello MQ".getBytes());
        // Kunci pesan. Kunci adalah atribut spesifik bisnis dari pesan dan harus unik secara global jika memungkinkan.
        // Jika Anda tidak dapat menerima pesan seperti yang diharapkan, Anda dapat menggunakan kunci untuk mencari dan mengirim ulang pesan di konsol ApsaraMQ for RocketMQ.
        // Catatan: Pesan dapat dikirim dan diterima meskipun Anda tidak menentukan kunci pesan.
        msg.setKey("ORDERID_100");
        try {
            // Waktu tunda sebelum pesan dikirim. Nilai ini harus lebih dari waktu saat ini. Nilai maksimum: 3456000000 (40 hari). Unit: milidetik.
            // Dalam contoh berikut, pesan dikirim setelah penundaan 3 detik.
            long delayTime = System.currentTimeMillis() + 3000;

            // Titik waktu ketika broker ApsaraMQ for RocketMQ mulai mengirimkan pesan tertunda.
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan dikirim.
            if (sendResult != null) {
            System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + msg.getTopic() + " msgId adalah: " + sendResult.getMessageId());
            }
            } catch (Exception e) {
            // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
            System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + msg.getTopic());
            e.printStackTrace();
        }
        // Sebelum Anda keluar dari aplikasi, matikan produser.
        // Catatan: Jika Anda mematikan produser, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produser.
        producer.shutdown();
    }
}           

Berlangganan pesan tertunda

Kode contoh untuk berlangganan pesan tertunda sama dengan kode contoh untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Kirim dan Terima Pesan Normal.

Kirim dan terima pesan transaksional

Kirim pesan transaksional

package com.aliyun.openservices.tcp.example.producer;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;

import java.util.Date;
import java.util.Properties;

public class SimpleTransactionProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. Catatan: Pesan transaksional tidak dapat berbagi grup dengan pesan jenis lain.
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        
        /**
        * Jika Anda menggunakan endpoint publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi parameter AccessKey dan SecretKey. Nilai parameter AccessKey adalah nama pengguna instance, dan nilai parameter SecretKey adalah kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        * Catatan: Jangan masukkan pasangan AccessKey akun Alibaba Cloud Anda.
        * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
        * Jika instance tersebut adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur autentikasi bebas di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
        */
        // Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.AccessKey,"NAMA PENGGUNA INSTANCE");
        properties.put(PropertyKeyConst.SecretKey, "KATA SANDI INSTANCE");
        // Catatan: Jika Anda menggunakan TCP client SDK untuk mengakses instance ApsaraMQ for RocketMQ 5.x, Anda tidak perlu menentukan ID instance. Jika tidak, akses akan gagal.

        // Endpoint yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format endpoint serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "TITIK AKSES");

        // Sebelum Anda menginisialisasi produser, Anda harus mendaftarkan pemeriksa untuk memeriksa status transaksi lokal.
        LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
        TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
        transactionProducer.start();

        Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());

        for (int i = 0; i < 3; i++) {
            try{
                SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        System.out.println("Menjalankan transaksi lokal dan mengirimkan status transaksi.");
                        return TransactionStatus.CommitTransaction;
                    }
                }, null);
                assert sendResult != null;
            }catch (ONSClientException e){
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Pengiriman pesan mq gagal! Topik adalah:" + msg.getTopic());
                e.printStackTrace();
            }
        }

        System.out.println("Pengiriman pesan transaksi berhasil.");
    }
}
// Pemeriksa transaksi lokal.
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
   
    @Override
    public TransactionStatus check(Message msg) {
        System.out.println("Permintaan untuk memeriksa status transaksi pesan diterima. MsgId: " + msg.getMsgID());
        return TransactionStatus.CommitTransaction;
    }
}

Berlangganan pesan transaksional

Kode contoh untuk berlangganan pesan transaksional sama dengan kode contoh untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Kirim dan Terima Pesan Normal.