All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ

Last Updated:Oct 22, 2025

This topic describes the ApsaraMQ for RocketMQ connector.

Important

ApsaraMQ for RocketMQ 4.x Standard Edition instances have a shared elastic API call limit of 5,000 per second. When you use this version of messaging middleware to connect to Realtime Compute for Apache Flink, exceeding this limit triggers a throttling mechanism, which may cause your Flink jobs to become unstable. Therefore, we recommend that you evaluate the impact of using ApsaraMQ for RocketMQ 4.x Standard Edition. If your business scenario allows, consider using other middleware, such as Kafka, Simple Log Service (SLS), or DataHub, as an alternative. If you must use ApsaraMQ for RocketMQ 4.x Standard Edition to process large-scale messages, you can also submit a ticket to contact the ApsaraMQ for RocketMQ product team and request an increase in the throttling limit.

Background information

ApsaraMQ for RocketMQ is a distributed middleware service developed by Alibaba Cloud based on Apache RocketMQ. It provides low latency, high concurrency, high availability (HA), and high reliability. It offers asynchronous decoupling and peak-load shifting for distributed applications. It also supports features required by internet applications, such as massive message accumulation, high throughput, and reliable retries.

The RocketMQ connector supports the following.

Category

Details

Supported type

Source table and sink table

Run mode

Only stream mode is supported.

Data format

CSV and binary format

Specific monitoring metrics

Monitoring metrics

  • Source table

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • Sink table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metric description.

API type

Datastream (only for RocketMQ 4.x) and SQL

Supports updating or deleting data in sink tables

Does not support updating or deleting data in sink tables. Only supports inserting data.

Features

ApsaraMQ for RocketMQ source tables and sink tables support property fields.

  • Source table property fields

    Field name

    Field type

    Description

    topic

    VARCHAR METADATA VIRTUAL

    The message topic.

    queue-id

    INT METADATA VIRTUAL

    The message queue ID.

    queue-offset

    BIGINT METADATA VIRTUAL

    The consumer offset of the message queue.

    msg-id

    VARCHAR METADATA VIRTUAL

    The message ID.

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time when the message was stored.

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time when the message was generated.

    keys

    VARCHAR METADATA VIRTUAL

    The message keys.

    tags

    VARCHAR METADATA VIRTUAL

    The message tags.

  • Sink table property fields

    Field name

    Field type

    Description

    keys

    VARCHAR METADATA

    The message keys.

    tags

    VARCHAR METADATA

    The message tags.

Prerequisites

An ApsaraMQ for RocketMQ resource has been created. For more information, see Create a resource.

Limits

  • Only Ververica Runtime (VVR) 8.0.3 or later of Realtime Compute for Apache Flink supports ApsaraMQ for RocketMQ 5.x.

  • The ApsaraMQ for RocketMQ connector uses a pull consumer to consume messages. All subtasks share the consumption load.

Syntax

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH parameters

General

Parameter

Description

Data type

Required

Default value

Remarks

connector

The connector type.

String

Yes

-

  • For ApsaraMQ for RocketMQ 4.x, set this parameter to mq.

  • For ApsaraMQ for RocketMQ 5.x, set this parameter to mq5.

endPoint

The endpoint address.

String

Yes

-

The ApsaraMQ for RocketMQ endpoint can be one of the following types:

  • Endpoint for an internal ApsaraMQ for RocketMQ service (Alibaba Cloud classic network or VPC): In the ApsaraMQ for RocketMQ console, go to the details page of the destination instance. Choose Endpoint > TCP Protocol Client Access Point > Internal Network Access to obtain the endpoint.

  • Endpoint for a public ApsaraMQ for RocketMQ service: In the ApsaraMQ for RocketMQ console, go to the details page of the destination instance. Choose Endpoint > TCP Protocol > Client Access Point > Public Network Access to obtain the endpoint.

Important

Due to dynamic changes in Alibaba Cloud Network Security policies, network connectivity issues may occur when Realtime Compute for Apache Flink connects to a public ApsaraMQ for RocketMQ service. We recommend that you use an internal ApsaraMQ for RocketMQ service.

  • Internal services do not support cross-domain access. For example, if your Realtime Compute for Apache Flink service is in the China (Hangzhou) region but your ApsaraMQ for RocketMQ service is in the China (Shanghai) region, access is denied.

  • To access ApsaraMQ for RocketMQ over the Internet, you must enable the public network access feature. For more information, see Select a network connection type.

topic

The topic name.

String

Yes

None

None.

accessId

  • 4.x: The AccessKey ID of your Alibaba Cloud account.

  • 5.x:

    5.x: The username of the ApsaraMQ for RocketMQ instance.

String

  • ApsaraMQ for RocketMQ 4.x: Yes

  • ApsaraMQ for RocketMQ 5.x: No

None

Important

To prevent your AccessKey information from being leaked, use variables to specify the AccessKey pair. For more information, see Project variables.

  • ApsaraMQ for RocketMQ 5.x: If you access the instance from an endpoint for public network access, you must set this parameter to the username of the ApsaraMQ for RocketMQ instance in the console. If you access the instance from an Alibaba Cloud ECS instance over an internal network, you do not need to configure this parameter.

accessKey

  • 4.x: The AccessKey secret of your Alibaba Cloud account.

  • 5.x: The password of the instance.

String

  • ApsaraMQ for RocketMQ 4.x: Yes

  • ApsaraMQ for RocketMQ 5.x: No

-

tag

The tag to subscribe to or write.

String

No

-

  • When ApsaraMQ for RocketMQ is used as a source table, you can read only a single tag.

  • When ApsaraMQ for RocketMQ is used as a sink table, you can set multiple tags. Separate the tags with commas (,).

Note

When used as a sink table, this parameter is supported only for ApsaraMQ for RocketMQ 4.x. For ApsaraMQ for RocketMQ 5.x, use the sink table property fields to specify the tag of the output message.

encoding

The encoding format.

String

No

UTF-8

None.

instanceID

The ApsaraMQ for RocketMQ instance ID.

String

No

-

  • If the ApsaraMQ for RocketMQ instance does not have an independent namespace, you cannot use the instanceID parameter.

  • If the ApsaraMQ for RocketMQ instance has an independent namespace, the instanceID parameter is required.

Note

This parameter is supported only for ApsaraMQ for RocketMQ 4.x.

Source table specific

Parameter

Description

Data type

Required

Default value

Remarks

consumerGroup

The name of the consumer group.

String

Yes

-

None.

pullIntervalMs

The hibernation duration for the source when no data is available for consumption from the upstream.

Int

Yes

None

Unit: milliseconds.

No throttling mechanism is available. You cannot set a rate for reading data from ApsaraMQ for RocketMQ.

Note

This parameter is supported only for ApsaraMQ for RocketMQ 4.x.

timeZone

The time zone.

String

No

-

Example: Asia/Shanghai.

startTimeMs

The start time.

Long

No

None

A UNIX timestamp. Unit: milliseconds.

startMessageOffset

The starting offset of messages.

Int

No

-

If this parameter is set, data loading preferentially starts from the offset of startMessageOffset.

lineDelimiter

The row delimiter used to parse a block.

String

No

\n

None.

fieldDelimiter

The field separator.

String

No

\u0001

The separator varies based on the mode of the ApsaraMQ for RocketMQ client:

  • In read-only mode (the default mode), the delimiter is \u0001. In this mode, the delimiter is not visible.

  • In edit mode, the separator is ^A.

lengthCheck

The policy to check the number of fields in a single row.

Int

No

NONE

Valid values:

  • NONE: This is the default value.

    • If the number of parsed fields is greater than the number of defined fields, the defined number of fields are taken from left to right.

    • If the number of parsed fields is less than the number of defined fields, the row of data is skipped.

  • SKIP: If the number of parsed fields is different from the number of defined fields, the data is skipped.

  • EXCEPTION: If the number of parsed fields is different from the number of defined fields, an exception is returned.

  • PAD: Fields are filled from left to right.

    • If the number of parsed fields is greater than the number of defined fields, the defined number of fields are taken from left to right.

    • If the number of parsed fields is less than the number of defined fields, the missing fields at the end of the row are filled with null values.

columnErrorDebug

Specifies whether to enable debugging.

Boolean

No

false

If you set this parameter to true, logs of parsing exceptions are printed.

pullBatchSize

The maximum number of messages to pull at a time.

Int

No

64

This parameter is supported only in VVR 8.0.7 or later of Realtime Compute for Apache Flink.

Sink table specific

Parameter

Description

Data type

Required

Default value

Remarks

producerGroup

The group to write to.

String

Yes

-

None.

retryTimes

The number of retries for a write operation.

Int

No

10

None.

sleepTimeMs

The retry interval.

Long

No

5000

None.

partitionField

Specifies the name of the field to use as the partition key column.

String

No

None

If mode is set to partition, this parameter is required.

Note

This parameter is supported only in VVR 8.0.5 or later of Realtime Compute for Apache Flink.

deliveryTimestampMode

Specifies the mode for delayed messages. This parameter, along with the deliveryTimestampValue parameter, determines the delivery time of delayed messages.

String

No

-

Valid values:

  • fixed: The fixed timestamp mode.

  • relative: The relative delay time mode.

  • field: The mode in which a specified field is used as the delivery time.

Note

This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink.

deliveryTimestampType

Specifies the time base type for delayed messages.

String

No

processing_time

Valid values:

  • event_time: The event time.

  • processing_time: The processing time.

Note

This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink.

deliveryTimestampValue

The delivery time for delayed messages.

Long

No

None

The meaning of this parameter varies based on the value of deliveryTimestampMode:

  • deliveryTimestampMode=fixed: The message is delayed until the specified timestamp in milliseconds. If the current time is later than the specified timestamp, the message is delivered immediately.

  • deliveryTimestampMode=relative: The delay time based on the time type specified by deliveryTimestampType. The default unit is milliseconds.

  • deliveryTimestampMode=field: This parameter does not take effect. The delay time is determined by the value of the field specified by deliveryTimestampField.

Note

This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink.

deliveryTimestampField

Specifies the field to use for the delivery time of delayed messages. The field type must be BIGINT.

String

No

-

This parameter takes effect only when deliveryTimestampMode is set to field.

Note

This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink.

Type mapping

Flink field type

ApsaraMQ for RocketMQ field type

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Code examples

Source table examples

  • CSV format

    Assume that you have the following message record in CSV format.

    1,name,male 
    2,name,female
    Note

    A RocketMQ message can contain zero or more data records, separated by \n.

    The following Data Definition Language (DDL) statement shows how to declare an ApsaraMQ for RocketMQ source table in a Flink job.

    • ApsaraMQ for RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • Binary format

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
    • ApsaraMQ for RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

Sink table examples

  • Create a sink table

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • ApsaraMQ for RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      Note

      If your ApsaraMQ for RocketMQ message is in binary format, you can define only one field in the DDL statement, and the field type must be VARBINARY.

  • Create a sink table that specifies the keys and tags fields as the key and tag for RocketMQ messages

    • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

DataStream API

Important

When you read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to fully managed Flink. For more information about how to configure the DataStream connector, see Use a DataStream connector.

Ververica Runtime (VVR) provides MetaQSource to read data from ApsaraMQ for RocketMQ. It also provides MetaQOutputFormat, which is an implementation of the OutputFormat class, to write data to ApsaraMQ for RocketMQ. The following code provides examples on how to read data from and write data to ApsaraMQ for RocketMQ:

ApsaraMQ for RocketMQ 5.x

Note

In ApsaraMQ for RocketMQ 5.x, the AccessKey pair corresponds to the username and password that are configured for the instance. If you access the ApsaraMQ for RocketMQ instance over an internal network and Access Control List (ACL) authentication is not enabled for the instance, you do not need to configure the AccessKey pair parameters.

import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

ApsaraMQ for RocketMQ 4.x

import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

XML

<!--MQ 5.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>

<!--MQ 4.x-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
Note

For more information about how to configure the endpoint for ApsaraMQ for RocketMQ, see Announcement on the settings of TCP internal endpoints.

FAQ

How does ApsaraMQ for RocketMQ detect changes in the number of topic partitions when a topic is scaled out?