All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Catatan Penggunaan

Last Updated:Jul 02, 2025

Topik ini menjelaskan catatan penggunaan untuk menggunakan SDK RabbitMQ open source dalam menghubungkan ke broker ApsaraMQ for RabbitMQ.

Haruskah saya mengonfigurasi fitur pemulihan koneksi otomatis saat menggunakan klien untuk terhubung ke broker ApsaraMQ for RabbitMQ?

Saat mengonfigurasi com.rabbitmq.client.ConnectionFactory, Anda harus mengaktifkan pemulihan koneksi otomatis untuk memastikan klien dapat tersambung kembali ke broker selama peningkatan broker. Jika tidak, pembacaan dan penulisan pesan akan terganggu.

// Tentukan apakah akan mengaktifkan pemulihan koneksi otomatis. Jika Anda menetapkan parameter ini ke true, pemulihan koneksi otomatis diaktifkan. Jika Anda menetapkan parameter ini ke false, pemulihan koneksi otomatis dinonaktifkan.
factory.setAutomaticRecoveryEnabled(true);
// Tentukan interval antara dua pemulihan otomatis berturut-turut. Unit: milidetik.
factory.setNetworkRecoveryInterval(5000);

Tindakan pencegahan apa yang perlu saya ambil selama produksi pesan?

  • Selama produksi atau konsumsi pesan, hindari sering mengaktifkan atau menonaktifkan koneksi. Kami merekomendasikan agar Anda menggunakan koneksi persisten. Dengan cara ini, klien tidak perlu membuat koneksi baru setiap kali digunakan untuk mengirim atau menerima pesan. Pembuatan koneksi yang berulang menghabiskan banyak sumber daya jaringan dan broker, bahkan dapat memicu perlindungan terhadap serangan SYN Flood pada broker. Untuk informasi lebih lanjut, lihat Koneksi.

  • Sebelum mengirim pesan, tentukan apakah akan mengaktifkan konfirmasi penerbit. Setelah diaktifkan, broker memanggil metode lokal untuk mengonfirmasi penerimaan pesan. Perhatikan bahwa penerimaan pesan hanya diakui jika klien menerima publishAck yang dikembalikan oleh broker. Jika klien gagal mengirim pesan atau menerima publishAck, implementasikan ulang pesan di klien untuk mencegah hilangnya pesan.

    Catatan

    Setelah mengaktifkan konfirmasi penerbit, mode pengiriman pesan berubah dari sinkron menjadi asinkron, yang dapat meningkatkan latensi pengiriman pesan. Dalam hal ini, broker hanya mengembalikan respons sukses untuk pesan setelah memastikan bahwa pesan ditulis ke disk.

    Contoh berikut menunjukkan cara mengimplementasikan konfirmasi penerbit dalam klien Java:

    1. Aktifkan konfirmasi penerbit saat membuat saluran.

      // Buat koneksi dan saluran.
      Connection connection = createConnection(hostName, userName, passWord, virtualHost);
      Channel channel = connection.createChannel();
      // Aktifkan konfirmasi penerbit untuk saluran.
      channel.confirmSelect();
    2. Setelah memanggil metode basicPublish, tunggu respons dari broker lalu jalankan logika bisnis berdasarkan respons tersebut.

      // Kirim pesan. Kami merekomendasikan agar Anda menggunakan kembali saluran yang ada dalam proses ini.
      channel.basicPublish(exchangeName, bindingKey, true, props,
                      ("contoh body").getBytes(StandardCharsets.UTF_8));
      
      // Periode timeout untuk konfirmasi. Unit: milidetik. Dalam contoh ini, nilainya diatur ke 3000, yang menentukan bahwa periode timeout untuk konfirmasi adalah 3 detik. 
      if (channel.waitForConfirms(3000)) {
          // Logika untuk menulis pesan ke disk.
      } else {
          // Logika untuk mengirim ulang pesan.
      }
  • Jika parameter mandatory disetel ke true tetapi pesan tidak mencapai antrian karena masalah routing, broker memanggil operasi ReturnListener yang ditambahkan oleh klien.

    // Tambahkan callback untuk kegagalan routing ke saluran.
    channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
    // Kirim pesan. Atur parameter ketiga ke true.
    channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
  • Saat mengirim pesan, kami sangat merekomendasikan agar Anda menentukan ID kustom untuk setiap pesan. ID merupakan pengenal unik pesan. Anda dapat menggunakan ID untuk melacak pesan dan jejaknya, serta membantu menyelesaikan masalah. Untuk informasi lebih lanjut, lihat Bagaimana cara menentukan ID pesan?

  • Selama pengiriman pesan, Anda harus menentukan apakah akan melempar pengecualian berdasarkan jenis kesalahan yang dikembalikan oleh operasi basicPublish.

    • Jika kesalahan yang menunjukkan pengecualian bisnis dikembalikan, seperti kesalahan ExchangeNotExist, pengecualian harus dilempar.

    • Jika throttling terjadi selama pengiriman pesan, kami merekomendasikan agar Anda menutup koneksi yang ada dan membuat serta menginisialisasi saluran baru untuk memastikan kontinuitas layanan.

    Kode sampel berikut hanya menunjukkan sebagian logika. Untuk kode sampel lengkap, lihat Hasilkan pesan.

    private void doSend(String content) throws Exception {
        try {
            // Tentukan ID pesan.
            String msgId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
    
            channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
            // Periode timeout untuk konfirmasi. Unit: milidetik. Dalam contoh ini, nilainya diatur ke 3000, yang menentukan bahwa periode timeout untuk konfirmasi adalah 3 detik.
            if (channel.waitForConfirms(3000)) {
                // Logika untuk menulis pesan ke disk.
            } else {
                // Logika untuk mengirim ulang pesan.
            }
    
        } catch (Exception e) {
            // Dapatkan pesan kesalahan.
            String message = e.getMessage();
            System.out.println(message);
    
            // Periksa apakah saluran ditutup.
            if (channelClosedByServer(message)) {
                // Tutup saluran yang ada dan buat yang baru.
                factory.closeCon(channel);
                channel = factory.createChannel();
                // Inisialisasi saluran baru, seperti mengaktifkan konfirmasi penerbit untuk itu.
                this.initChannel();
                // Kirim ulang pesan.
                doSend(content);
            } else {
                // Jika kesalahan tidak disebabkan oleh broker, lempar pengecualian atau tentukan logika pemrosesan selanjutnya.
                throw e;
            }
        }
    }
    
    private boolean channelClosedByServer(String errorMsg) {
        if (errorMsg != null
            && errorMsg.contains("channel.close")) {
            return true;
        } else {
            return false;
        }
    }

Tindakan pencegahan apa yang perlu saya ambil selama konsumsi pesan?

  • Saat mengonsumsi pesan, Anda harus mencegah bias konsumsi. Untuk informasi lebih lanjut, lihat bagian "Catatan Penggunaan" dari topik Koneksi dan Saluran.

  • Kami merekomendasikan agar Anda menggunakan tag konsumen unik global yang dihasilkan oleh broker alih-alih menentukan tag konsumen. Jika Anda ingin menentukan tag konsumen, pastikan bahwa tag tersebut unik.

  • Saat mengonsumsi pesan, Anda dapat memanggil metode basic.basicQos untuk menentukan nilai kualitas layanan (QoS) guna membatasi jumlah pesan yang belum diakui yang dapat disimpan di klien. Saat nilai QoS yang ditentukan tercapai, broker tidak lagi mendorong pesan ke klien. Setelah klien mengirim ACK, broker mendorong jumlah pesan yang sama dengan jumlah pesan yang diakui ke klien. Dengan cara ini, jumlah maksimum pesan yang belum diakui yang disimpan di broker lebih kecil dari atau sama dengan nilai QoS. Anda juga dapat menentukan nilai QoS untuk saluran atau konsumen. channel.basicQos(100, true) menunjukkan bahwa semua konsumen yang dibuat di saluran dapat menyimpan total 100 pesan yang belum diakui di broker. channel.basicQos(100, false) atau channel.basicQos(100) menunjukkan bahwa setiap konsumen yang dibuat di saluran dapat menyimpan 100 pesan yang belum diakui di broker. Jika Anda tidak menentukan nilai QoS di klien, nilai QoS default di broker digunakan. Secara default, broker membatasi jumlah pesan yang belum diakui yang dapat disimpan setiap konsumen di broker hingga 100. Pengaturan ini setara dengan channel.basicQos(100, false). Nilai QoS kustom yang Anda tentukan tidak boleh melebihi 100. Jika tidak, pengaturan tidak akan berlaku dan nilai default digunakan. Jika kemampuan konsumsi Anda tidak tinggi, kami merekomendasikan agar Anda menentukan nilai QoS yang kecil. Jika jumlah pesan yang terakumulasi di broker mencapai nilai QoS yang ditentukan, broker berhenti mendorong pesan ke klien. Dalam hal ini, Anda dapat melihat broker mendorong pesan secara berkala ke klien. Jika Anda memulai ulang konsumen, broker memulihkan dorongan pesan. Untuk menyelesaikan masalah ini, kami merekomendasikan agar Anda meningkatkan kemampuan konsumsi konsumen. Jika Anda mengonsumsi pesan dalam mode autoACK, basicQos tidak berlaku.

  • Jika konsumen tidak mengirim ACK untuk pesan dalam periode validitas, percobaan konsumsi ulang dipicu dan pesan dikirim ulang. Setiap pesan yang gagal dapat dicoba hingga 16 kali. Jika pesan masih gagal dikonsumsi setelah dicoba 16 kali, pesan tersebut dibuang atau dikirim ke pertukaran surat mati. Tabel berikut menjelaskan periode timeout konsumsi untuk edisi instans yang berbeda.

    Parameter kebijakan pengulangan instans

    Jenis Instans

    Instans Serverless

    Instans Berlangganan

    Berbagi

    Eksklusif

    Edisi Enterprise

    Edisi Platinum Enterprise

    Bayar berdasarkan kapasitas-provisioned-dan-lalu lintas-elastis/bayar berdasarkan permintaan-pesan

    Bayar berdasarkan kapasitas-provisioned-dan-lalu lintas-elastis

    Periode timeout konsumsi

    Nilai maksimum: 3 jam

    Nilai default: 5 menit

    Nilai maksimum: 12 jam

    Nilai default: 30 menit

    Nilai maksimum: 3 jam

    Nilai default: 5 menit

    Nilai maksimum: 12 jam

    Nilai default: 30 menit

    Maksimum upaya pengiriman

    Nilai maksimum: 16

    Nilai default: 16

    Nilai maksimum: 16

    Nilai default: 16

    Nilai maksimum: 16

    Nilai default: 16

    Nilai maksimum: 64

    Nilai default: 16

  • Dibandingkan dengan metode basicConsume, metode basicGet memiliki efisiensi penarikan pesan dan batas transaksi per detik (TPS) yang lebih rendah. Di lingkungan produksi, kami merekomendasikan agar Anda menggunakan metode basicConsume untuk mengonsumsi pesan jika jumlah pesan besar.

  • Ambang batas throttling diberlakukan pada operasi metadata seperti queueDeclare dan exchangeDeclare. Kami merekomendasikan agar Anda membuat antrian dan pertukaran di konsol ApsaraMQ for RabbitMQ alih-alih dengan memanggil operasi selama pengiriman pesan. Jika tidak, throttling dapat dipicu dan koneksi dapat ditutup. Untuk informasi lebih lanjut, lihat Batasan.