All Products
Search
Document Center

ApsaraMQ for RocketMQ:Penyaringan Pesan

Last Updated:Jun 28, 2025

Setelah konsumen berlangganan topik, broker ApsaraMQ for RocketMQ mengirimkan semua pesan dalam topik ke konsumen. Jika konsumen ingin menerima hanya pesan tertentu dari topik tersebut, Anda dapat mengonfigurasi atribut pesan dan kondisi filter. Dengan cara ini, broker ApsaraMQ for RocketMQ hanya mengirimkan pesan yang atributnya sesuai dengan kondisi filter kepada konsumen. Topik ini menjelaskan mekanisme kerja, skenario penggunaan, dan batasan fitur penyaringan pesan, serta cara mengonfigurasi penyaringan pesan dan menyediakan contoh kode untuk implementasinya.

Deskripsi Fitur

Fitur penyaringan pesan dapat diimplementasikan dengan mengonfigurasi atribut pesan dan kondisi filter. Atribut pesan dikonfigurasi untuk mengklasifikasikan pesan yang dikirim oleh produsen ke topik, sedangkan kondisi filter digunakan untuk memfilter pesan dengan atribut tertentu dalam topik. Broker ApsaraMQ for RocketMQ kemudian memfilter pesan dari produsen dan hanya mengirimkan pesan yang memenuhi kondisi filter tertentu kepada konsumen.

Jika konsumen berlangganan ke topik tanpa menentukan kondisi filter, konsumen akan menerima semua pesan dalam topik, terlepas dari apakah atribut telah dikonfigurasikan untuk pesan yang dikirim oleh produsen.

Tabel berikut menjelaskan metode penyaringan yang didukung oleh ApsaraMQ for RocketMQ.

Metode

Deskripsi

Skenario

Batasan Instansi

Batasan Protokol

Penyaringan Berbasis Tag (default)

  • Anda dapat menambahkan tag ke pesan yang dikirim oleh produsen.

  • Anda dapat menentukan tag dari pesan yang akan diterima oleh konsumen.

Jika tag dari pesan yang dikirim oleh produsen cocok dengan tag yang ditentukan dari pesan yang akan diterima oleh konsumen, broker mengirimkan pesan tersebut ke konsumen.

Metode ini cocok untuk skenario penyaringan sederhana.

Anda hanya dapat menambahkan satu tag ke pesan. Metode ini memungkinkan Anda mengklasifikasikan dan memfilter pesan berdasarkan satu atribut.

Tidak ada.

Tidak ada.

Penyaringan Berbasis SQL Atribut

  • Anda dapat mengonfigurasi atribut kustom untuk pesan yang dikirim oleh produsen.

  • Anda dapat menggunakan ekspresi SQL untuk menentukan kondisi untuk memfilter pesan yang akan diterima oleh konsumen.

Pesan yang memenuhi kondisi filter dikirimkan ke konsumen.

Metode ini cocok untuk skenario penyaringan kompleks.

Anda dapat mengonfigurasi beberapa atribut kustom untuk sebuah pesan. Metode ini memungkinkan Anda menggunakan ekspresi SQL kustom untuk memfilter pesan berdasarkan beberapa atribut.

Hanya instansi Enterprise Platinum Edition yang mendukung metode ini.

Hanya SDK klien TCP ApsaraMQ for RocketMQ yang mendukung metode ini.

Penyaringan Berbasis Tag

Tag adalah label yang mengklasifikasikan pesan dalam sebuah topik. Sebelum produsen ApsaraMQ for RocketMQ mengirimkan pesan, Anda dapat menambahkan tag ke pesan tersebut. Kemudian, konsumen dapat berlangganan pesan berdasarkan tag yang ditambahkan.

Skema Contoh

Dalam skenario transaksi e-commerce, pesan-pesan berikut dihasilkan:

  • Pesan pesanan

  • Pesan pembayaran

  • Pesan logistik

Pesan-pesan tersebut dikirim ke topik bernama Trade_Topic yang disubskripsikan oleh sistem hilir berikut:

  • Sistem pembayaran: hanya berlangganan pesan pembayaran.

  • Sistem logistik: hanya berlangganan pesan logistik.

  • Sistem analisis tingkat keberhasilan transaksi: berlangganan pesan pesanan dan pembayaran.

  • Sistem komputasi waktu nyata: berlangganan semua pesan.

Gambar berikut menunjukkan proses penyaringan.filtermessage

Metode Konfigurasi

ApsaraMQ for RocketMQ memungkinkan Anda mendefinisikan kode dalam SDK klien untuk memfilter pesan menggunakan tag. Sebelum produsen mengirimkan pesan, Anda harus menambahkan tag ke pesan dan menentukan tag dari pesan yang ingin diterima oleh konsumen. Untuk informasi tentang SDK, lihat Ikhtisar. Bagian berikut menjelaskan cara mendefinisikan kode untuk produsen dan konsumen:

  • Mengirim pesan

    Sebelum Anda mengirim pesan, tentukan tag untuk setiap pesan.

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • Berlangganan semua pesan

    Jika konsumen ingin berlangganan semua pesan dalam topik, gunakan tanda bintang (*) untuk menentukan semua tag.

        consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Berlangganan jenis pesan tertentu

    Jika konsumen ingin berlangganan jenis pesan tertentu dalam topik, tentukan tag yang sesuai.

        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Berlangganan beberapa jenis pesan

    Jika konsumen ingin berlangganan beberapa jenis pesan dalam topik, tentukan tag yang sesuai. Pisahkan beberapa tag dengan dua garis vertikal (||).

        consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • Contoh salah

    Jika konsumen memiliki beberapa langganan ke topik, dan setiap langganan memiliki tag yang berbeda, konsumen hanya akan menerima pesan yang tag-nya cocok dengan tag yang ditentukan dalam langganan terbaru.

        // Dalam kode berikut, konsumen dapat menerima pesan dengan TagB dalam MQ_TOPIC tetapi tidak dapat menerima pesan dengan TagA.
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

Penyaringan Berbasis SQL Atribut

Untuk menggunakan metode penyaringan berbasis SQL atribut, lakukan langkah-langkah berikut: konfigurasikan atribut kustom untuk pesan sebelum produsen mengirimkan pesan dan definisikan ekspresi filter menggunakan sintaksis SQL untuk menentukan atribut pesan yang ingin diterima oleh konsumen. ApsaraMQ for RocketMQ memfilter pesan yang atribut kustomnya cocok dengan hasil perhitungan ekspresi filter dan mengirimkan pesan tersebut ke konsumen.

Catatan

Tag adalah jenis atribut pesan khusus. Metode penyaringan berbasis SQL atribut kompatibel dengan metode penyaringan berbasis tag. Anda dapat menggunakan ekspresi SQL untuk menentukan tag yang digunakan untuk penyaringan pesan. Dalam sintaksis SQL, atribut tag diwakili oleh TAGS.

Batasan

Ketika Anda menggunakan metode penyaringan berbasis SQL atribut untuk memfilter pesan, perhatikan batasan berikut:

  • Hanya instansi Enterprise Platinum Edition yang mendukung metode ini.

  • Hanya klien TCP yang mendukung metode ini.

  • Jika broker tidak mendukung metode penyaringan berbasis SQL atribut dan Anda mendefinisikan ekspresi filter untuk konsumen, kesalahan dilaporkan saat konsumen dimulai atau konsumen mungkin tidak menerima pesan.

Skema Contoh

Item berikut menggambarkan pesan yang dihasilkan dalam skenario transaksi e-commerce. Pesan diklasifikasikan menjadi pesan pesanan dan pesan logistik. Atribut wilayah ditentukan untuk pesan logistik. Nilai atribut wilayah adalah Hangzhou dan Shanghai.

  • Pesan pesanan

  • Pesan logistik

    • Pesan logistik yang nilai atribut wilayahnya adalah Hangzhou

    • Pesan logistik yang nilai atribut wilayahnya adalah Shanghai

Pesan-pesan tersebut dikirim ke topik bernama Trade_Topic yang disubskripsikan oleh sistem hilir berikut:

  • Sistem logistik 1: hanya berlangganan pesan logistik yang nilai atribut wilayahnya adalah Hangzhou.

  • Sistem Logistik 2: berlangganan semua pesan logistik.

  • Sistem pelacakan pesanan: hanya berlangganan pesan pesanan.

  • Sistem komputasi waktu nyata: berlangganan semua pesan.

Gambar berikut menunjukkan proses penyaringan.sql过滤

Metode Konfigurasi

ApsaraMQ for RocketMQ memungkinkan Anda mendefinisikan kode dalam SDK klien untuk memfilter pesan menggunakan ekspresi SQL. Anda harus mengonfigurasi atribut pesan kustom dalam kode produsen untuk mengirim pesan dan mendefinisikan ekspresi filter menggunakan sintaksis SQL dalam kode konsumen untuk berlangganan pesan.

Batasan berikut diberlakukan pada atribut pesan:

  • Sebelum produsen mengirimkan pesan, produsen dapat menentukan atribut kustom untuk setiap pesan. Setiap atribut adalah pasangan kunci-nilai kustom.

    • Kunci dalam atribut dapat berisi huruf, angka, dan garis bawah (_).

    • Kunci dalam atribut harus dimulai dengan huruf atau garis bawah (_).

  • Anda dapat menentukan beberapa atribut untuk setiap pesan.

Untuk informasi tentang SDK, lihat Ikhtisar. Bagian berikut menjelaskan cara mendefinisikan kode untuk produsen dan konsumen:

  • Produsen

    Konfigurasikan atribut pesan kustom.

    Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
    // Konfigurasikan Atribut A dan atur nilai atribut menjadi 1.
    msg.putUserProperties("A", "1");
  • Konsumen

    Definisikan ekspresi filter menggunakan sintaksis SQL untuk memfilter pesan berdasarkan atribut kustom.

    Penting

    Untuk memfilter pesan berdasarkan atribut kustom, Anda harus mendefinisikan logika dalam ekspresi filter untuk memeriksa apakah atribut pesan ada. Jika atribut tidak ada, hasil perhitungan ekspresi filter adalah NULL dan pesan tidak dikirimkan ke konsumen.

    // Berlangganan pesan yang memiliki Atribut A dan nilai atributnya adalah 1.
    consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });

Tabel berikut menjelaskan berbagai jenis sintaksis SQL yang dapat digunakan untuk mendefinisikan ekspresi filter.

Sintaksis

Deskripsi

Contoh

IS NULL

Menentukan bahwa atribut tidak ada.

a IS NULL: Atribut a tidak ada.

IS NOT NULL

Menentukan bahwa atribut ada.

a IS NOT NULL: Atribut a ada.

  • >

  • >=

  • <

  • <=

Membandingkan nilai numerik. Anda tidak dapat menggunakan sintaksis ini untuk membandingkan string. Jika tidak, kesalahan dilaporkan saat konsumen dimulai.

Catatan

String yang dapat dikonversi menjadi nilai numerik dianggap sebagai nilai numerik.

  • a IS NOT NULL AND a > 100: Atribut a ada dan nilai Atribut a lebih besar dari 100.

  • a IS NOT NULL AND a > 'abc': Contoh kesalahan. Anda tidak dapat membandingkan nilai Atribut a dengan abc karena abc adalah string.

BETWEEN xxx AND xxx

Membandingkan nilai numerik. Anda tidak dapat menggunakan sintaksis ini untuk membandingkan string. Jika tidak, kesalahan dilaporkan saat konsumen dimulai. Sintaksis ini setara dengan >= xxx AND <= xxx, yang menentukan bahwa nilai atribut berada di antara dua nilai numerik atau sama dengan salah satu dari dua nilai numerik tersebut.

a IS NOT NULL AND (a BETWEEN 10 AND 100): Atribut a ada dan nilai Atribut a lebih besar dari atau sama dengan 10 dan kurang dari atau sama dengan 100.

NOT BETWEEN xxx AND xxx

Membandingkan nilai numerik. Anda tidak dapat menggunakan sintaksis ini untuk membandingkan string. Jika tidak, kesalahan dilaporkan saat konsumen dimulai. Sintaksis ini setara dengan < xxx OR > xxx, yang menentukan bahwa nilai atribut kurang dari nilai numerik di sisi kiri atau lebih besar dari nilai numerik di sisi kanan.

a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Atribut a ada dan nilai Atribut a kurang dari 10 atau lebih besar dari 100.

IN (xxx, xxx)

Menentukan bahwa nilai atribut termasuk dalam satu set. Elemen dalam set hanya bisa berupa string.

a IS NOT NULL AND (a IN ('abc', 'def')): Atribut a ada dan nilai Atribut a adalah abc atau def.

  • =

  • <>

Operator sama dengan dan operator tidak sama dengan. Anda dapat menggunakan operator ini untuk membandingkan nilai numerik dan string.

a IS NOT NULL AND (a = 'abc' OR a<>'def'): Atribut a ada dan nilai Atribut a adalah abc atau bukan def.

  • AND

  • OR

Operator logika AND dan operator logika OR. Anda dapat menggunakan operator ini untuk menggabungkan fungsi logika sederhana. Setiap fungsi logika harus diapit tanda kurung.

a IS NOT NULL AND (a > 100) OR (b IS NULL): Atribut a ada dan nilai Atribut a lebih besar dari 100 atau Atribut b tidak ada.

Anda dapat mengimplementasikan penyaringan berbasis SQL atribut dengan mengonfigurasi atribut pesan kustom dan mendefinisikan ekspresi filter SQL. Ekspresi filter mungkin tidak menghasilkan hasil yang valid. Broker ApsaraMQ for RocketMQ memproses pesan berdasarkan logika berikut:

  • Jika terjadi kesalahan selama perhitungan ekspresi filter, broker secara otomatis memfilter pesan yang diterima dan tidak mengirimkan pesan tersebut ke konsumen. Misalnya, pengecualian terjadi ketika nilai numerik dan non-numerik dibandingkan.

  • Jika hasil perhitungan ekspresi filter adalah NULL atau nilainya bukan nilai Boolean, broker secara otomatis memfilter pesan yang diterima dan tidak mengirimkan pesan tersebut ke konsumen. Nilai Boolean mewakili nilai kebenaran, yang bisa benar atau salah. Misalnya, ketika konsumen berlangganan pesan, konsumen menggunakan atribut yang tidak ditentukan oleh produsen sebagai kondisi filter. Dalam hal ini, hasil perhitungan ekspresi filter adalah NULL.

  • Jika nilai atribut pesan kustom adalah bilangan floating-point tetapi nilai atribut yang digunakan dalam ekspresi filter adalah bilangan bulat, broker secara otomatis memfilter pesan yang diterima dan tidak mengirimkan pesan tersebut ke konsumen.

Kode Contoh

  • Mengirim pesan

    Konfigurasikan tag dan atribut kustom untuk pesan.

    Producer producer = ONSFactory.createProducer(properties);
    // Atur nilai Tag menjadi tagA.
    Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
    // Atur nilai atribut kustom region menjadi hangzhou.
    msg.putUserProperties("region", "hangzhou");
    // Atur nilai atribut kustom price menjadi 50.
    msg.putUserProperties("price", "50");
    SendResult sendResult = producer.send(msg);
  • Berlangganan pesan berdasarkan atribut kustom.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Berlangganan hanya pesan yang nilai atribut kustom region-nya adalah hangzhou. Jika Anda tidak mengonfigurasi atribut ini untuk pesan atau nilai atribut pesan bukan hangzhou, pesan tidak dikirimkan ke konsumen.
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Hasil yang diharapkan: Pesan yang dikirim dalam contoh memiliki atribut kustom region dan nilai atributnya adalah hangzhou. Pesan memenuhi kondisi filter dan dikirimkan ke konsumen.

  • Berlangganan pesan berdasarkan tag dan atribut kustom.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Berlangganan hanya pesan yang memiliki tagA dan nilai atribut kustom price lebih besar dari 30.
    consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Hasil yang diharapkan: Pesan yang dikirim dalam contoh memiliki tagA dan atribut kustom price, dan nilai atribut kustom price lebih besar dari 30. Pesan memenuhi kondisi filter dan dikirimkan ke konsumen.

  • Berlangganan pesan berdasarkan beberapa atribut kustom.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Berlangganan hanya pesan yang nilai atribut kustom region-nya adalah hangzhou dan nilai atribut kustom price kurang dari 20.
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Hasil yang diharapkan: Pesan tidak dikirimkan ke konsumen karena pesan tidak memenuhi kondisi filter. Konsumen berlangganan pesan yang nilai atribut kustom price-nya kurang dari 20, tetapi nilai atribut kustom price yang ditentukan untuk pesan dalam produsen adalah 50.

  • Berlangganan semua pesan dalam topik.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Untuk berlangganan semua pesan dalam topik, atur nilai ekspresi SQL menjadi TRUE.
    consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });

    Hasil yang diharapkan: Semua pesan dalam topik dikirimkan ke konsumen.

  • Contoh salah

    Sebelum produsen mengirimkan pesan, atribut kustom tidak dikonfigurasikan untuk pesan, dan tidak ada logika yang didefinisikan dalam ekspresi filter untuk memeriksa apakah atribut kustom ada. Atribut kustom langsung digunakan sebagai kondisi filter dalam ekspresi. Dalam hal ini, pesan tidak dikirimkan ke konsumen.

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Atribut pesan produk tidak dikonfigurasikan selama pengiriman pesan. Penyaringan gagal dan pesan tidak dikirimkan ke konsumen.
    consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Menerima Pesan Baru: %s %n", message);
            return Action.CommitMessage;
        }
    });               

Referensi

  • Instansi konsumen yang menggunakan ID grup yang sama harus berlangganan topik yang sama. Untuk informasi lebih lanjut, lihat Konsistensi langganan.

  • Anda dapat menggunakan topik dan tag untuk mengklasifikasikan pesan untuk layanan yang berbeda. Untuk informasi lebih lanjut, lihat Praktik terbaik untuk topik dan tag.