全部产品
Search
文档中心

ApsaraMQ for RocketMQ:Mengirim pesan menggunakan banyak thread

更新时间:Jun 28, 2025

Objek konsumen dan produsen ApsaraMQ for RocketMQ bersifat aman terhadap thread dan dapat dibagikan di antara beberapa thread.

Anda dapat menerapkan beberapa instans produsen dan konsumen pada satu atau lebih broker. Selain itu, Anda dapat menjalankan beberapa thread untuk mengirim atau menerima pesan dalam sebuah instans produsen atau konsumen, yang meningkatkan transaksi per detik (TPS) untuk pengiriman atau penerimaan pesan.

Penting
  • Jangan membuat instans produsen atau instans konsumen untuk setiap thread.

  • Kami menyarankan agar Anda tidak menggunakan beberapa thread untuk mengirim pesan terurut.

    Broker ApsaraMQ for RocketMQ menentukan urutan produksi pesan berdasarkan urutan penggunaan pengirim dengan satu produsen atau thread untuk mengirim pesan. Jika pengirim menggunakan beberapa produsen atau thread untuk mengirim pesan secara bersamaan, urutan pesan ditentukan oleh urutan diterimanya pesan oleh broker ApsaraMQ for RocketMQ. Urutan ini mungkin berbeda dari urutan pengiriman di sisi bisnis.

Berikut adalah contoh kode yang menunjukkan cara membagikan produsen di antara beberapa thread:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class SharedProducer {
    public static void main(String[] args) {
        // Inisialisasi konfigurasi produsen.
        Properties properties = new Properties();
        // ID grup konsumen yang Anda buat di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi.
        // ID AccessKey yang digunakan untuk autentikasi.
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // Rahasia AccessKey yang digunakan untuk autentikasi.
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // Periode waktu habis untuk mengirim pesan. Satuan: milidetik.
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,"3000");
        // Titik akhir TCP. Anda dapat memperoleh titik akhir di bagian titik akhir TCP pada halaman Detail Instance di konsol ApsaraMQ for RocketMQ.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
        final Producer producer = ONSFactory.createProducer(properties);
        // Sebelum Anda mengirim pesan, panggil fungsi start() hanya sekali untuk memulai produsen.
        producer.start();

        // Objek produsen dan konsumen yang dibuat bersifat aman terhadap thread dan dapat dibagikan di antara beberapa thread. Jangan membuat instans produsen atau instans konsumen untuk setiap thread.

        // Dua thread berbagi objek produsen dan mengirim pesan secara bersamaan ke ApsaraMQ for RocketMQ.
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(
                    // Topik tempat pesan normal diproduksi. Topik yang digunakan untuk mengirim dan menerima pesan normal tidak dapat digunakan untuk mengirim atau menerima pesan jenis lain.
                    "TopicTestMQ",
                    // Tag pesan. Tag pesan mirip dengan tag Gmail dan digunakan oleh konsumen untuk menyaring pesan di broker ApsaraMQ for RocketMQ.
                    "TagA",
                    // Tubuh pesan. Tubuh pesan adalah data dalam format biner. ApsaraMQ for RocketMQ tidak memproses tubuh pesan.
                    // Produsen dan konsumen harus sepakat tentang metode untuk serialisasi dan deserialisasi tubuh pesan.
                    "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
                    if (sendResult != null) {
                        System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + MqConfig.TOPIC + " msgId adalah: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();


        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // Kirim pesan dalam mode transmisi sinkron. Jika tidak ada pengecualian yang dilemparkan, pesan terkirim.
                    if (sendResult != null) {
                        System.out.println(new Date() + " Pengiriman pesan mq berhasil. Topik adalah:" + MqConfig.TOPIC + " msgId adalah: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // Logika yang ingin Anda gunakan untuk mengirim ulang atau menyimpan pesan jika pesan gagal dikirim dan perlu dikirim ulang.
                    System.out.println(new Date() + " Pengiriman pesan mq gagal. Topik adalah:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();


        // Opsional. Jika instans produsen tidak lagi digunakan, hentikan produsen dan lepaskan sumber daya yang dialokasikan.
        // producer.shutdown();
    }
}