全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Langkah 3: Gunakan SDK untuk mengirim dan menerima pesan

更新时间:Jul 02, 2025

ApsaraMQ for RocketMQ menyediakan SDK dalam berbagai bahasa pemrograman untuk mengirim dan menerima pesan dari berbagai jenis. Topik ini menjelaskan cara menggunakan SDK Java untuk terhubung ke broker ApsaraMQ for RocketMQ guna mengirim dan menerima pesan normal.

Prasyarat

  • Sumber daya yang diperlukan telah dibuat di ApsaraMQ for RocketMQ. Untuk informasi lebih lanjut, lihat Langkah 2: Buat sumber daya.

  • IntelliJ IDEA telah diinstal. Untuk informasi lebih lanjut, lihat IntelliJ IDEA.

    Anda dapat menggunakan IntelliJ IDEA atau Eclipse. Dalam contoh ini, IntelliJ IDEA Ultimate digunakan.

  • JDK 1.8 atau versi lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Unduhan Java.

  • Maven 2.5 atau versi lebih baru telah diinstal. Untuk informasi lebih lanjut, lihat Mengunduh Apache Maven.

Instal pustaka dependensi Java

  1. Buat proyek Java di IntelliJ IDEA.

  2. Tambahkan dependensi berikut ke file pom.xml untuk mengimpor pustaka dependensi Java:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.7</version>
    </dependency>

Hasilkan pesan

Dalam proyek Java yang dibuat, buat program untuk mengirim pesan normal dan jalankan program tersebut. Contoh kode:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
         * Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Nama topik tempat pesan dikirim. Sebelum Anda menggunakan topik untuk menerima pesan, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ. Jika tidak, kesalahan akan dikembalikan.
        String topic = "Topik Anda";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                /**
                 * Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
                 */
                //.setNamespace("InstanceId")
                /**
                 * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
                 * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
                 * Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
                 */
                //.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
                .build();
        /**
         * Saat Anda menginisialisasi produser, Anda dapat menentukan topik yang ingin Anda gunakan untuk memeriksa apakah pengaturan topik valid dan mencegah topik tidak valid dimulai.
         * Anda tidak perlu menentukan topik untuk pesan non-transaksional. Broker secara dinamis memeriksa apakah topik valid.
         * Catatan: Untuk mencegah operasi API yang dipanggil untuk memeriksa pesan transaksional gagal, Anda harus menentukan topik untuk pesan transaksional sebelumnya.
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(clientConfiguration)
                .build();
        // Kirim pesan normal.
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // Kunci pesan. Anda dapat menggunakan kunci untuk mencari pesan.
                .setKeys("kunciPesan")
                // Tag pesan. Konsumen dapat menggunakan tag untuk memfilter pesan.
                .setTag("tagPesan")
                // Tubuh pesan.
                .setBody("isiPesan".getBytes())
                .build();
        try {
            // Kirim pesan. Perhatikan hasilnya dan tangkap pengecualian seperti kegagalan.
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Konsumsi pesan

Dalam proyek Java yang dibuat, buat program untuk berlangganan pesan normal dan jalankan program tersebut. ApsaraMQ for RocketMQ memungkinkan Anda mengonsumsi pesan dalam mode sederhana dan mode dorong. Untuk informasi lebih lanjut, lihat SimpleConsumer dan PushConsumer. Anda dapat memilih salah satu mode untuk berlangganan pesan. Tabel berikut menjelaskan perbedaan antara konsumen sederhana dan konsumen dorong.

Item

PushConsumer

SimpleConsumer

Pemanggilan operasi API

Operasi callback dipanggil untuk mengembalikan hasil konsumsi menggunakan pendengar pesan. Konsumen hanya dapat memproses logika konsumsi dalam ruang lingkup pendengar pesan.

Aplikasi bisnis mengimplementasikan pemrosesan pesan dan memanggil operasi yang sesuai untuk mengembalikan hasil konsumsi.

Manajemen konkurensi konsumsi

SDK ApsaraMQ for RocketMQ digunakan untuk mengelola jumlah thread konkuren untuk konsumsi pesan.

Jumlah thread konkuren yang digunakan untuk konsumsi pesan didasarkan pada logika konsumsi aplikasi bisnis individu.

Fleksibilitas API

Operasi API dienkapsulasi dan memberikan fleksibilitas yang buruk.

Operasi atomik memberikan fleksibilitas yang besar.

Skenario

Tipe konsumen ini cocok untuk skenario pengembangan yang tidak memerlukan proses kustom.

Tipe konsumen ini cocok untuk skenario pengembangan yang memerlukan proses kustom.

PushConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
         * Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
        String topic = "Topik Anda";
        // Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
        String consumerGroup = "GrupKonsumen Anda";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                /**
                 * Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
                 */
                //.setNamespace("InstanceId")
                /**
                 * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
                 * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
                 * Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
                 */
                //.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
                .build();
        // Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan dalam topik dilanggan.
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Inisialisasi konsumen dorong. Saat Anda menginisialisasi konsumen dorong, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Grup konsumen.
                .setConsumerGroup(consumerGroup)
                // Langganan.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Pendengar pesan.
                .setMessageListener(messageView -> {
                    // Konsumsi pesan dan kembalikan hasil konsumsi.
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Konsumsi Pesan: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // Jika Anda tidak lagi memerlukan konsumen dorong, matikan prosesnya.
        //pushConsumer.close();
    }
}   

SimpleConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * Titik akhir dari instance. Anda dapat melihat titik akhir pada tab Endpoints halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
         * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, kami sarankan Anda menentukan titik akhir VPC.
         * Jika Anda mengakses instance melalui Internet atau dari pusat data, Anda dapat menentukan titik akhir publik. Jika Anda mengakses instance melalui Internet, Anda harus mengaktifkan fitur akses Internet untuk instance tersebut.
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Topik yang ingin Anda langgani. Sebelum Anda menentukan topik, Anda harus membuat topik tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
        String topic = "Topik Anda";
        // Grup konsumen tempat konsumen berada. Sebelum Anda menentukan grup konsumen, Anda harus membuat grup konsumen tersebut di konsol ApsaraMQ for RocketMQ terlebih dahulu. Jika tidak, kesalahan akan dikembalikan.
        String consumerGroup = "GrupKonsumen Anda";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                /**
                 * Jika instance ApsaraMQ for RocketMQ adalah instance serverless dan Anda menggunakan titik akhir publik untuk mengakses instance, Anda harus menentukan ID instance.
                 */
                //.setNamespace("InstanceId")
                /**
                 * Jika Anda menggunakan titik akhir publik untuk mengakses instance ApsaraMQ for RocketMQ, Anda harus menentukan nama pengguna dan kata sandi instance. Anda dapat memperoleh nama pengguna dan kata sandi pada tab Otentikasi Cerdas halaman Kontrol Akses yang sesuai dengan instance di konsol ApsaraMQ for RocketMQ.
                 * Jika klien instance ApsaraMQ for RocketMQ ditempatkan pada instance ECS dan Anda ingin mengakses instance ApsaraMQ for RocketMQ dalam jaringan internal, Anda tidak perlu menentukan nama pengguna atau kata sandi. Broker secara otomatis mendapatkan nama pengguna dan kata sandi berdasarkan informasi VPC.
                 * Jika instance adalah instance ApsaraMQ for RocketMQ serverless, Anda harus menentukan nama pengguna dan kata sandi untuk mengakses instance melalui Internet. Jika Anda mengaktifkan fitur tanpa otentikasi di VPC untuk instance serverless dan mengakses instance dalam VPC, Anda tidak perlu menentukan nama pengguna atau kata sandi.
                 */
                //.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))
                .build();
        Duration awaitDuration = Duration.ofSeconds(10);
        // Aturan yang digunakan untuk memfilter pesan. Dalam contoh berikut, semua pesan dalam topik dilanggan.
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Inisialisasi konsumen sederhana. Saat Anda menginisialisasi konsumen sederhana, Anda harus menentukan grup konsumen, parameter komunikasi, dan langganan untuk konsumen.
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Grup konsumen.
                .setConsumerGroup(consumerGroup)
                // Durasi timeout untuk permintaan polling panjang.
                .setAwaitDuration(awaitDuration)
                // Langganan.
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Jumlah maksimum pesan yang akan ditarik.
        int maxMessageNum = 16;
        // Waktu tak terlihat pesan.
        Duration invisibleDuration = Duration.ofSeconds(10);
        // Jika Anda menggunakan konsumen sederhana untuk mengonsumsi pesan, klien harus mendapatkan dan mengonsumsi pesan dalam loop.
        // Untuk mengonsumsi pesan secara real-time, kami sarankan Anda menggunakan beberapa thread untuk menarik pesan secara konkuren.
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Pesan diterima: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // Setelah konsumsi selesai, konsumen harus memanggil metode ACK untuk mengirimkan hasil konsumsi ke broker.
                    consumer.ack(message);
                    System.out.println("Pesan diakui berhasil, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // Jika Anda tidak lagi memerlukan konsumen sederhana, matikan prosesnya.
        // consumer.close();
    }
}   

Deskripsi versi untuk mengakses instance serverless melalui Internet

Hanya SDK versi tertentu yang dapat mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet.

SDK untuk Java 5.x

Jika Anda mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet untuk mengirim dan menerima pesan, pastikan versi SDK untuk Java memenuhi kondisi berikut dan tambahkan informasi berikut dalam kode:

Catatan

Ganti InstanceId dengan ID instance ApsaraMQ for RocketMQ Anda.

  • rocketmq-client: versi 5.2.0 atau lebih baru

    Tambahkan informasi berikut dalam kode saat Anda mengirim pesan: producer.setNamespaceV2("InstanceId");

    Tambahkan informasi berikut dalam kode saat Anda menerima pesan: consumer.setNamespaceV2("InstanceId");

  • rocketmq-client-java: versi 5.0.6 atau lebih baru

    Tambahkan informasi berikut dalam kode saat Anda mengirim dan menerima pesan:

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();

SDK klien TCP untuk Java 1.x

Jika Anda mengakses instance ApsaraMQ for RocketMQ serverless melalui Internet untuk mengirim dan menerima pesan, pastikan bahwa versi RocketMQ 1.x TCP client SDK untuk Java adalah 1.9.0.Final atau lebih baru dan tambahkan informasi berikut dalam kode:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

Catatan

Ganti InstanceId dengan ID instance ApsaraMQ for RocketMQ Anda.

Parameter SDK

Parameter

Contoh

Deskripsi

endpoints

rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080

Titik akhir instance ApsaraMQ for RocketMQ. Untuk informasi tentang cara memperoleh titik akhir, lihat Dapatkan titik akhir instance.

  • Jika Anda mengakses instance melalui Internet, tentukan titik akhir publik.

  • Jika Anda mengakses instance dalam VPC, tentukan titik akhir VPC.

InstanceId

rmq-cn-xxx

ID instance ApsaraMQ for RocketMQ.

topic

normal_test

Topik tempat pesan dikirim atau dari mana pesan dikonsumsi dalam instance ApsaraMQ for RocketMQ.

Anda harus membuat topik tersebut pada instance ApsaraMQ for RocketMQ terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat topik.

group

GID_test

Grup konsumen yang digunakan oleh konsumen untuk mengonsumsi pesan dalam instance ApsaraMQ for RocketMQ.

Anda harus membuat grup konsumen tersebut pada instance ApsaraMQ for RocketMQ terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat grup konsumen.

Instance UserName

1XVg0hzgKm******

Nama pengguna instance ApsaraMQ for RocketMQ. Jika Anda mengakses instance melalui Internet, Anda harus menentukan nama pengguna. Jika Anda mengakses instance dalam VPC, Anda hanya perlu menentukan nama pengguna jika instance adalah instance serverless dan fitur tanpa otentikasi di VPC dinonaktifkan untuk instance tersebut.

Untuk informasi tentang cara memperoleh nama pengguna, lihat Dapatkan nama pengguna dan kata sandi instance.

Instance Password

ijSt8rEc45******

Kata sandi untuk instance ApsaraMQ for RocketMQ. Jika Anda mengakses instance melalui Internet, Anda harus menentukan kata sandi. Jika Anda mengakses instance di dalam VPC, Anda hanya perlu menentukan kata sandi jika instance tersebut merupakan instance tanpa server dan fitur autentikasi gratis di VPC dinonaktifkan untuk instance tersebut.

Untuk informasi tentang cara mendapatkan kata sandi, lihat Memperoleh nama pengguna dan kata sandi sebuah instance.

Verifikasi konsumsi pesan

Setelah Anda mengonsumsi pesan, Anda dapat memeriksa status konsumsi pesan tersebut di konsol ApsaraMQ for RocketMQ.

  1. Masuk ke konsol ApsaraMQ for RocketMQ. Pada halaman Instances, klik nama instance yang ingin Anda kelola.

  2. Di panel navigasi sisi kiri halaman yang muncul, klik Message Tracing.

Referensi SDK

Untuk informasi tentang cara menggunakan SDK untuk bahasa pemrograman lainnya untuk mengirim dan menerima pesan dari jenis lain, lihat Ikhtisar.