全部产品
Search
文档中心

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ

更新时间:Nov 10, 2025

Topik ini menjelaskan ApsaraMQ for RocketMQ.

Penting

Instans Edisi Standar ApsaraMQ for RocketMQ versi 4.x memiliki batas panggilan API elastis bersama sebesar 5.000 per detik. Jika Anda menggunakan middleware messaging versi ini untuk terhubung ke Realtime Compute for Apache Flink dan melebihi batas tersebut, mekanisme throttling akan dipicu sehingga pekerjaan Flink Anda menjadi tidak stabil. Oleh karena itu, kami menyarankan agar Anda mengevaluasi dampak penggunaan ApsaraMQ for RocketMQ 4.x Edisi Standar. Jika skenario bisnis Anda memungkinkan, pertimbangkan alternatif lain seperti Kafka, Simple Log Service (SLS), atau DataHub. Namun, jika Anda harus menggunakan ApsaraMQ for RocketMQ 4.x Edisi Standar untuk memproses pesan dalam skala besar, Anda dapat membuat tiket untuk menghubungi tim produk ApsaraMQ for RocketMQ dan meminta peningkatan batas throttling.

Informasi latar belakang

ApsaraMQ for RocketMQ adalah layanan middleware terdistribusi yang dikembangkan oleh Alibaba Cloud berdasarkan Apache RocketMQ. Layanan ini menyediakan latensi rendah, konkurensi tinggi, ketersediaan tinggi (HA), serta keandalan tinggi. Layanan ini mendukung penguraian keterkaitan asinkron dan penggeseran beban puncak untuk aplikasi terdistribusi, serta fitur-fitur penting bagi aplikasi internet seperti akumulasi pesan dalam jumlah besar, throughput tinggi, dan retry yang andal.

Konektor RocketMQ mendukung hal-hal berikut.

Kategori

Rincian

Jenis yang didukung

Tabel sumber dan tabel sink

Mode jalankan

Hanya mode stream yang didukung.

Format data

Format CSV dan biner

Metrik pemantauan spesifik

Metrik pemantauan

  • Tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • Tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang metrik tersebut, lihat Deskripsi metrik.

Jenis API

Datastream (hanya untuk RocketMQ 4.x) dan SQL

Mendukung pembaruan atau penghapusan data di tabel sink

Tidak mendukung pembaruan atau penghapusan data di tabel sink. Hanya mendukung penyisipan data.

Fitur

Tabel sumber dan tabel sink ApsaraMQ for RocketMQ mendukung bidang properti.

  • Bidang properti tabel sumber

    Nama bidang

    Jenis bidang

    Deskripsi

    topic

    VARCHAR METADATA VIRTUAL

    Topik pesan.

    queue-id

    INT METADATA VIRTUAL

    ID antrian pesan.

    queue-offset

    BIGINT METADATA VIRTUAL

    Offset konsumen dari antrian pesan.

    msg-id

    VARCHAR METADATA VIRTUAL

    ID pesan.

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    Waktu saat pesan disimpan.

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    Waktu saat pesan dihasilkan.

    keys

    VARCHAR METADATA VIRTUAL

    Kunci pesan.

    tags

    VARCHAR METADATA VIRTUAL

    Tag pesan.

  • Bidang properti tabel sink

    Nama bidang

    Jenis bidang

    Deskripsi

    keys

    VARCHAR METADATA

    Kunci pesan.

    tags

    VARCHAR METADATA

    Tag pesan.

Prasyarat

Sumber daya ApsaraMQ for RocketMQ telah dibuat. Untuk informasi selengkapnya, lihat Buat sumber daya.

Batasan

  • Hanya Ververica Runtime (VVR) 8.0.3 atau yang lebih baru dari Realtime Compute for Apache Flink yang mendukung ApsaraMQ for RocketMQ 5.x.

  • Konektor ApsaraMQ for RocketMQ menggunakan konsumen pull untuk mengonsumsi pesan. Semua subtugas berbagi beban konsumsi.

Sintaksis

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

Parameter WITH

Umum

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Jenis konektor.

String

Ya

-

  • Untuk ApsaraMQ for RocketMQ 4.x, atur parameter ini ke mq.

  • Untuk ApsaraMQ for RocketMQ 5.x, atur parameter ini ke mq5.

endPoint

Alamat titik akhir.

String

Ya

-

Titik akhir ApsaraMQ for RocketMQ dapat berupa salah satu jenis berikut:

  • Titik akhir untuk layanan ApsaraMQ for RocketMQ internal (jaringan klasik Alibaba Cloud atau VPC): Di konsol ApsaraMQ for RocketMQ, buka halaman detail instans tujuan. Pilih Endpoint > TCP Protocol Client Access Point > Internal Network Access untuk mendapatkan titik akhir.

  • Titik akhir untuk layanan ApsaraMQ for RocketMQ publik: Di konsol ApsaraMQ for RocketMQ, buka halaman detail instans tujuan. Pilih Endpoint > TCP Protocol > Client Access Point > Public Network Access untuk mendapatkan titik akhir.

Penting

Karena perubahan dinamis pada kebijakan Keamanan Jaringan Alibaba Cloud, masalah konektivitas jaringan dapat terjadi ketika Realtime Compute for Apache Flink terhubung ke layanan ApsaraMQ for RocketMQ publik. Kami menyarankan agar Anda menggunakan layanan ApsaraMQ for RocketMQ internal.

  • Layanan internal tidak mendukung akses lintas domain. Misalnya, jika layanan Realtime Compute for Apache Flink Anda berada di wilayah Tiongkok (Hangzhou) tetapi layanan ApsaraMQ for RocketMQ Anda berada di wilayah Tiongkok (Shanghai), akses akan ditolak.

  • Untuk mengakses ApsaraMQ for RocketMQ melalui Internet, Anda harus mengaktifkan fitur akses jaringan publik. Untuk informasi selengkapnya, lihat Pilih jenis koneksi jaringan.

topic

Nama topik.

String

Ya

Tidak ada

Tidak ada.

accessId

  • 4.x: ID AccessKey akun Alibaba Cloud Anda.

  • 5.x:

    5.x: Nama pengguna instans ApsaraMQ for RocketMQ.

String

  • ApsaraMQ for RocketMQ 4.x: Ya

  • ApsaraMQ for RocketMQ 5.x: Tidak

Tidak ada

Penting

Untuk mencegah informasi AccessKey Anda bocor, gunakan variabel untuk menentukan pasangan Kunci Akses. Untuk informasi selengkapnya, lihat Variabel proyek.

  • ApsaraMQ for RocketMQ 5.x: Jika Anda mengakses instans dari titik akhir untuk akses jaringan publik, Anda harus mengatur parameter ini ke nama pengguna instans ApsaraMQ for RocketMQ di konsol. Jika Anda mengakses instans dari instans ECS Alibaba Cloud melalui jaringan internal, Anda tidak perlu mengonfigurasi parameter ini.

accessKey

  • 4.x: Rahasia AccessKey akun Alibaba Cloud Anda.

  • 5.x: Kata sandi instans.

String

  • ApsaraMQ for RocketMQ 4.x: Ya

  • ApsaraMQ for RocketMQ 5.x: Tidak

-

tag

Tag yang akan berlangganan atau ditulis.

String

Tidak

-

  • Saat ApsaraMQ for RocketMQ digunakan sebagai tabel sumber, Anda hanya dapat membaca satu tag saja.

  • Saat ApsaraMQ for RocketMQ digunakan sebagai tabel sink, Anda dapat mengatur beberapa tag. Pisahkan tag dengan koma (,).

Catatan

Saat digunakan sebagai tabel sink, parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x. Untuk ApsaraMQ for RocketMQ 5.x, gunakan bidang properti tabel sink untuk menentukan tag pesan keluaran.

encoding

Format penyandian.

String

Tidak

UTF-8

Tidak ada.

instanceID

ID instans ApsaraMQ for RocketMQ.

String

Tidak

-

  • Jika instans ApsaraMQ for RocketMQ tidak memiliki namespace independen, Anda tidak dapat menggunakan parameter instanceID.

  • Jika instans ApsaraMQ for RocketMQ memiliki namespace independen, parameter instanceID wajib diisi.

Catatan

Parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x.

Khusus tabel sumber

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

consumerGroup

Nama grup konsumen.

String

Ya

-

Tidak ada.

pullIntervalMs

Durasi hibernasi untuk sumber saat tidak ada data yang tersedia untuk dikonsumsi dari hulu.

Int

Ya

Tidak ada

Satuan: milidetik.

Tidak tersedia mekanisme throttling. Anda tidak dapat mengatur laju pembacaan data dari ApsaraMQ for RocketMQ.

Catatan

Parameter ini hanya didukung untuk ApsaraMQ for RocketMQ 4.x.

timeZone

Zona waktu.

String

Tidak

-

Contoh: Asia/Shanghai.

startTimeMs

Waktu mulai.

Long

Tidak

Tidak ada

Stempel waktu UNIX. Unit: milidetik.

startMessageOffset

Offset awal pesan.

Int

Tidak

-

Jika parameter ini diatur, pemuatan data diprioritaskan dimulai dari offset startMessageOffset.

lineDelimiter

Pemisah baris yang digunakan untuk mengurai blok.

String

Tidak

\n

Tidak ada.

fieldDelimiter

Pemisah bidang.

String

Tidak

\u0001

Pemisah bervariasi berdasarkan mode klien ApsaraMQ for RocketMQ:

  • Dalam mode read-only (mode default), pemisahnya adalah \u0001. Dalam mode ini, pemisah tidak terlihat.

  • Dalam mode edit, pemisahnya adalah ^A.

lengthCheck

Kebijakan untuk memeriksa jumlah bidang dalam satu baris.

Int

Tidak

NONE

Nilai valid:

  • NONE: Ini adalah nilai default.

    • Jika jumlah bidang yang diurai lebih besar daripada jumlah bidang yang didefinisikan, jumlah bidang yang didefinisikan diambil dari kiri ke kanan.

    • Jika jumlah bidang yang diurai lebih kecil daripada jumlah bidang yang didefinisikan, baris data dilewati.

  • SKIP: Jika jumlah bidang yang diurai berbeda dari jumlah bidang yang didefinisikan, data dilewati.

  • EXCEPTION: Jika jumlah bidang yang diurai berbeda dari jumlah bidang yang didefinisikan, pengecualian dikembalikan.

  • PAD: Bidang diisi dari kiri ke kanan.

    • Jika jumlah bidang yang diurai lebih besar daripada jumlah bidang yang didefinisikan, jumlah bidang yang didefinisikan diambil dari kiri ke kanan.

    • Jika jumlah bidang yang diurai lebih kecil daripada jumlah bidang yang didefinisikan, bidang yang hilang di akhir baris diisi dengan nilai null.

columnErrorDebug

Menentukan apakah debugging diaktifkan.

Boolean

Tidak

false

Jika Anda mengatur parameter ini ke true, log pengecualian parsing dicetak.

pullBatchSize

Jumlah maksimum pesan yang ditarik sekaligus.

Int

Tidak

64

Parameter ini hanya didukung di VVR 8.0.7 atau yang lebih baru dari Realtime Compute for Apache Flink.

Khusus tabel sink

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

producerGroup

Kelompok tempat menulis.

String

Ya

-

Tidak ada.

retryTimes

Jumlah percobaan ulang untuk operasi tulis.

Int

Tidak

10

Tidak ada.

sleepTimeMs

Interval percobaan ulang.

Long

Tidak

5000

Tidak ada.

partitionField

Menentukan nama bidang yang akan digunakan sebagai kolom kunci partisi.

String

Tidak

Tidak ada

Jika mode diatur ke partition, parameter ini wajib diisi.

Catatan

Parameter ini hanya didukung di VVR 8.0.5 atau yang lebih baru dari Realtime Compute for Apache Flink.

deliveryTimestampMode

Menentukan mode untuk pesan tertunda. Parameter ini, bersama dengan parameter deliveryTimestampValue, menentukan waktu pengiriman pesan tertunda.

String

Tidak

-

Nilai valid:

  • fixed: Mode stempel waktu tetap.

  • relative: Mode waktu tunda relatif.

  • field: Mode di mana bidang tertentu digunakan sebagai waktu pengiriman.

Catatan

Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink.

deliveryTimestampType

Menentukan jenis dasar waktu untuk pesan tertunda.

String

Tidak

processing_time

Nilai valid:

  • event_time: Waktu event.

  • processing_time: Waktu pemrosesan.

Catatan

Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink.

deliveryTimestampValue

Waktu pengiriman untuk pesan tertunda.

Long

Tidak

Tidak ada

Makna parameter ini bervariasi berdasarkan nilai deliveryTimestampMode:

  • deliveryTimestampMode=fixed: Pesan ditunda hingga stempel waktu tertentu dalam milidetik. Jika waktu saat ini lebih lambat dari stempel waktu yang ditentukan, pesan dikirim segera.

  • deliveryTimestampMode=relative: Waktu tunda berdasarkan jenis waktu yang ditentukan oleh deliveryTimestampType. Satuan default adalah milidetik.

  • deliveryTimestampMode=field: Parameter ini tidak berlaku. Waktu tunda ditentukan oleh nilai bidang yang ditentukan oleh deliveryTimestampField.

Catatan

Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink.

deliveryTimestampField

Menentukan bidang yang akan digunakan untuk waktu pengiriman pesan tertunda. Jenis bidang harus BIGINT.

String

Tidak

-

Parameter ini hanya berlaku ketika deliveryTimestampMode diatur ke field.

Catatan

Parameter ini hanya didukung di VVR 11.1 atau yang lebih baru dari Realtime Compute for Apache Flink.

Pemetaan jenis

Jenis bidang Flink

Jenis bidang ApsaraMQ for RocketMQ

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Contoh kode

Contoh tabel sumber

  • Format CSV

    Asumsikan Anda memiliki catatan pesan berikut dalam format CSV.

    1,name,male 
    2,name,female
    Catatan

    Pesan RocketMQ dapat berisi nol atau lebih catatan data, dipisahkan oleh \n.

    Pernyataan Data Definition Language (DDL) berikut menunjukkan cara mendeklarasikan tabel sumber ApsaraMQ for RocketMQ dalam pekerjaan Flink.

    • ApsaraMQ for RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • Format Biner

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
    • ApsaraMQ for RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

Contoh tabel sink

  • Buat Tabel Sink

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • ApsaraMQ for RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      Catatan

      Jika pesan ApsaraMQ for RocketMQ Anda dalam format biner, Anda hanya dapat mendefinisikan satu bidang dalam pernyataan DDL, dan jenis bidang tersebut harus VARBINARY.

  • Buat tabel sink yang menentukan bidang keys dan tags sebagai kunci dan tag untuk pesan RocketMQ

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

API DataStream

Penting

Saat membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Flink yang sepenuhnya dikelola. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Gunakan konektor DataStream.

Ververica Runtime (VVR) menyediakan MetaQSource untuk membaca data dari ApsaraMQ for RocketMQ. VVR juga menyediakan MetaQOutputFormat, yang merupakan implementasi dari kelas OutputFormat, untuk menulis data ke ApsaraMQ for RocketMQ. Kode berikut memberikan contoh cara membaca dan menulis data ke ApsaraMQ for RocketMQ:

ApsaraMQ for RocketMQ 5.x

Catatan

Pada ApsaraMQ for RocketMQ 5.x, pasangan AccessKey sesuai dengan nama pengguna dan kata sandi yang dikonfigurasi untuk instans. Jika Anda mengakses instans ApsaraMQ for RocketMQ melalui jaringan internal dan autentikasi Access Control List (ACL) tidak diaktifkan untuk instans tersebut, Anda tidak perlu mengonfigurasi parameter pasangan AccessKey.

import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * Demo yang menggambarkan cara mengonsumsi pesan dari RocketMQ, mengonversi
 * pesan, lalu menghasilkan pesan ke RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Menyiapkan lingkungan eksekusi streaming
        Configuration conf = new Configuration();

        // Dua konfigurasi berikut hanya untuk debugging lokal. Hapus sebelum Anda mengemas pekerjaan dan mengunggahnya ke Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "jalur mutlak file JAR uber");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

ApsaraMQ for RocketMQ 4.x

import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * Demo yang menggambarkan cara mengonsumsi pesan dari RocketMQ, mengonversi
 * pesan, lalu menghasilkan pesan ke RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Menyiapkan lingkungan eksekusi streaming
        Configuration conf = new Configuration();

        // Dua konfigurasi berikut hanya untuk debugging lokal. Hapus sebelum Anda mengemas pekerjaan dan mengunggahnya ke Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "jalur mutlak file JAR uber");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Membuat dan menambahkan sumber RocketMQ.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Mengonversi isi pesan menjadi huruf kapital.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Membuat dan menambahkan sink RocketMQ.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Mengompilasi dan mengirimkan pekerjaan.
        env.execute("Demo end-to-end konektor RocketMQ DataStream");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // selalu null
                null, // tag pesan yang akan dikonsumsi
                Long.MAX_VALUE, // stempel waktu berhenti dalam milidetik
                -1, // Stempel waktu mulai dalam milidetik. Atur ke -1 untuk menonaktifkan mulai dari offset.
                0, // Offset awal.
                300_000, // Interval penemuan partisi.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

XML

<!--MQ 5.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>

<!--MQ 4.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
Catatan

Untuk informasi selengkapnya tentang cara mengonfigurasi titik akhir untuk ApsaraMQ for RocketMQ, lihat Pengumuman tentang pengaturan titik akhir internal TCP.

FAQ

Bagaimana ApsaraMQ for RocketMQ mendeteksi perubahan jumlah partisi topik saat topik diskalakan?