全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Contoh kode

更新时间:Jun 28, 2025

ApsaraMQ for RocketMQ Instance 5.x kompatibel dengan klien yang menggunakan SDK RocketMQ 3.x atau 4.x. Anda dapat menggunakan SDK ini untuk terhubung ke instance ApsaraMQ for RocketMQ 5.x guna mengirim dan menerima pesan. Topik ini menyediakan contoh kode untuk mengirim dan menerima pesan menggunakan SDK RocketMQ 3.x atau 4.x untuk Java.

Penting
  • Kami merekomendasikan penggunaan SDK RocketMQ 5.x terbaru. SDK ini sepenuhnya kompatibel dengan ApsaraMQ for RocketMQ broker 5.x dan menawarkan lebih banyak fitur serta peningkatan. Untuk informasi lebih lanjut, lihat Deskripsi Versi.

  • Alibaba Cloud hanya memelihara SDK RocketMQ 3.x, 4.x, dan TCP client. Kami merekomendasikan penggunaannya hanya untuk bisnis yang sudah ada.

Mengirim dan menerima pesan normal

Mengirim pesan normal dalam mode transmisi sinkron

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQProducer producer = new DefaultMQProducer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        producer.setProducerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Kirim pesan mq gagal.");
                e.printStackTrace();
            }
        }

        // Sebelum keluar dari aplikasi, matikan produsen.
        // Catatan: Jika Anda mematikan produsen, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produsen.
        producer.shutdown();
    }
}

Mengirim pesan normal dalam mode transmisi asinkron

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQAsyncProducer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQProducer producer = new DefaultMQProducer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        producer.setProducerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        // Pesan berhasil dikirim.
                        System.out.println("pesan berhasil dikirim. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                        System.out.println("gagal mengirim pesan.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Kirim pesan mq gagal.");
                e.printStackTrace();
            }
        }
        // Blokir thread saat ini selama 3 detik dan tunggu hasilnya kembali.
        TimeUnit.SECONDS.sleep(3);

        // Sebelum keluar dari aplikasi, matikan produsen.
        // Catatan: Jika Anda mematikan produsen, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produsen.
        producer.shutdown();
    }
}

Mengirim pesan normal dalam mode transmisi satu arah

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOnewayProducer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQProducer producer = new DefaultMQProducer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        producer.setProducerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Kirim pesan mq gagal.");
                e.printStackTrace();
            }
        }

        // Sebelum keluar dari aplikasi, matikan produsen.
        // Catatan: Jika Anda mematikan produsen, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produsen.
        producer.shutdown();
    }
}

Berlangganan pesan normal

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Menerima Pesan Baru: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

Mengirim dan menerima pesan terurut

Mengirim pesan terurut

import java.util.List;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQProducer producer = new DefaultMQProducer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        producer.setProducerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                        "YOUR MESSAGE TAG",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Catatan: Parameter ini diperlukan. Pesan terurut dapat didistribusikan secara merata ke setiap antrian hanya setelah parameter ini dikonfigurasi.
                // Jika Anda menggunakan instance ApsaraMQ for RocketMQ 5.x, Anda dapat mengganti baris berikut dengan msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // Pilih algoritma pemilihan partisi yang sesuai untuk bisnis Anda. Algoritma tersebut dapat digunakan untuk memastikan bahwa hasil dari parameter yang sama adalah sama.
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

Berlangganan pesan terurut

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
         * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
         * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
         * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        consumer.setConsumerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Menerima Pesan Baru: %s %n", Thread.currentThread().getName(), msgs);
                // Jika konsumsi gagal, permintaan ditangguhkan dan dicoba ulang. Nilai berikut dikembalikan: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT.
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Konsumen Dimulai.%n");
    }
}

Mengirim dan menerima pesan terjadwal atau tertunda

Mengirim pesan terjadwal atau tertunda

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQDelayProducer {
    /**
     * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
     * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
     * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
     * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
        // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
        // DefaultMQProducer producer = new DefaultMQProducer();

        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        producer.setProducerGroup("YOUR GROUP ID");

        // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
        // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                // Topik yang Anda buat di konsol ApsaraMQ for RocketMQ.
                Message msg = new Message("YOUR TOPIC",
                        // Tag pesan.
                        "YOUR MESSAGE TAG",
                        // Isi pesan.
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // Waktu tunda untuk mengirim pesan tertunda. Unit: milidetik. Setelah Anda mengonfigurasi parameter ini, pesan akan dikirim setelah periode waktu yang ditentukan. Sebagai contoh, jika Anda mengatur parameter ini menjadi 3000, pesan akan dikirim setelah 3 detik.
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // Waktu terjadwal untuk mengirim pesan terjadwal. Setelah Anda mengonfigurasi parameter ini, pesan akan dikirim pada titik waktu yang ditentukan. Sebagai contoh, jika Anda mengatur parameter ini menjadi 2021-08-10 18:45:00, pesan akan dikirim pada pukul 18:45:00 tanggal 10 Agustus 2021.
                // Tentukan waktu dalam format yyyy-MM-dd HH:mm:ss. Jika Anda menentukan waktu yang lebih awal dari waktu saat ini, pesan akan segera dikirim ke konsumen.
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // Logika untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                System.out.println(new Date() + " Kirim pesan mq gagal.");
                e.printStackTrace();
            }
        }

        // Sebelum keluar dari aplikasi, matikan produsen.
        // Catatan: Jika Anda mematikan produsen, memori dapat dihemat. Jika Anda perlu sering mengirim pesan, jangan matikan produsen.
        producer.shutdown();
    }
}

Berlangganan pesan terjadwal atau tertunda

Contoh kode untuk berlangganan pesan terjadwal atau tertunda sama dengan contoh kode untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Berlangganan Pesan Normal.

Mengirim dan menerima pesan transaksional

Mengirim pesan transaksional

  1. Contoh kode berikut menunjukkan cara mengirim pesan setengah dan mengeksekusi transaksi lokal:

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook dan memasukkan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi di tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
             * Catatan: Jangan masukkan pasangan AccessKey dari akun Alibaba Cloud Anda.
             * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ di jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi instance. Broker ApsaraMQ for RocketMQ secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
             * Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur bebas otentikasi di VPC untuk instance serverless dan mengakses instance di VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
            // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ. Catatan: Pesan transaksional tidak dapat berbagi grup dengan pesan jenis lain.
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // Jika Anda menggunakan titik akhir VPC untuk mengakses instance ApsaraMQ for RocketMQ, Anda tidak perlu mengonfigurasi RPCHook.
            // Jika instance adalah instance serverless ApsaraMQ for RocketMQ, Anda harus mengonfigurasi RPCHook.
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // Atur parameter AccessChannel ke Alibaba Cloud. Jika Anda ingin mengaktifkan fitur jejak pesan, Anda harus mengonfigurasi parameter ini. Jika Anda ingin menonaktifkan fitur jejak pesan, biarkan parameter ini kosong.
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
    
            // Titik akhir yang Anda peroleh di konsol ApsaraMQ for RocketMQ. Format titik akhir serupa dengan rmq-cn-XXXX.rmq.aliyuncs.com:8080.
            // Catatan: Masukkan nama domain dan nomor port yang ditampilkan di konsol ApsaraMQ for RocketMQ. Jangan sertakan awalan http:// atau https:// atau gunakan alamat IP yang telah diselesaikan.
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override
                        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("Mulai mengeksekusi transaksi lokal: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. Contoh kode berikut menunjukkan cara mengirim status pesan transaksional:

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    // Kelas yang digunakan untuk memeriksa status transaksi lokal yang diimplementasikan dengan mengirim pesan transaksional ApsaraMQ for RocketMQ.
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        // Pemeriksa transaksi lokal. Untuk informasi lebih lanjut, lihat Pesan transaksional.
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("Permintaan untuk memeriksa status transaksi pesan diterima. MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

Berlangganan pesan transaksional

Contoh kode untuk berlangganan pesan transaksional sama dengan contoh kode untuk berlangganan pesan normal. Untuk informasi lebih lanjut, lihat Mengirim dan Menerima Pesan Normal.