全部產品
Search
文件中心

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ

更新時間:Oct 23, 2025

本文為您介紹ApsaraMQ for RocketMQ連接器。

重要

鑒於ApsaraMQ for RocketMQ 4.x標準版執行個體共用API調用彈性上限為每秒5000次,使用該版本的訊息中介軟體在與Realtime ComputeFlink版對接時,若超過上限會觸發限流機制,可能會導致Flink作業運行不穩定。因此,在選擇訊息中介軟體時,如果您正在或計劃通過標準版RocketMQ與Flink對接,請您謹慎評估。如果業務情境允許,請考慮使用Kafka、Log Service(SLS)或DataHub等其他中介軟體進行替代。如果您確實需要使用ApsaraMQ for RocketMQ 4.x標準版處理大規模的訊息,也請同時通過提交工單與RocketMQ產品取得聯絡申請提升限速上限。

背景資訊

雲訊息佇列 RocketMQ 版是阿里雲基於Apache RocketMQ構建的低延遲、高並發、高可用和高可靠的分布式訊息中介軟體。其既可為分布式應用系統提供非同步解耦和削峰填穀的能力,同時也具備互連網應用所需的海量訊息堆積、高吞吐和可靠重試等特性。

RocketMQ連接器支援的資訊如下。

類別

詳情

支援類型

源表和結果表

運行模式

僅支援流模式

資料格式

CSV和二進位格式

特有監控指標

監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream(僅支援RocketMQ 4.x)和SQL

是否支援更新或刪除結果表資料

不支援更新和刪除結果表資料,只支援插入資料。

特色功能

RocketMQ源表和結果表支援屬性欄位,具體如下。

  • 源表屬性欄位

    欄位名

    欄位類型

    說明

    topic

    VARCHAR METADATA VIRTUAL

    訊息Topic。

    queue-id

    INT METADATA VIRTUAL

    訊息佇列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    訊息佇列的消費位點。

    msg-id

    VARCHAR METADATA VIRTUAL

    訊息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    訊息儲存時間。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    訊息產生時間。

    keys

    VARCHAR METADATA VIRTUAL

    訊息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    訊息Tags。

  • 結果表屬性欄位

    欄位名

    欄位類型

    說明

    keys

    VARCHAR METADATA

    訊息Keys。

    tags

    VARCHAR METADATA

    訊息Tags。

前提條件

已建立了RocketMQ資源,詳情請參見建立資源

使用限制

  • 僅FlinkRealtime Compute引擎VVR 8.0.3及以上版本支援5.x版本的RocketMQ。

  • RocketMQ連接器使用Pull Consumer消費,所有的子任務分擔消費。

文法結構

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

connector類型。

String

  • RocketMQ 4.x固定值為mq

  • RocketMQ 5.x固定值為mq5

endPoint

EndPoint地址

String

ApsaraMQ for RocketMQ接入地址支援以下兩種類型:

  • 內網服務MQ(阿里雲傳統網路/VPC)接入地址:在MQ控制台目標執行個體詳情中,選擇存取點 > TCP協議用戶端存取點 > 內網訪問,擷取對應的EndPoint。

  • 公網服務MQ接入地址:在MQ控制台目標執行個體詳情中,選擇存取點 > TCP協議 > 用戶端存取點 > 公網訪問,擷取對應的EndPoint。

重要

由於阿里雲網路安全性原則動態變化,Realtime Compute串連公網服務MQ時可能會出現網路連接問題,推薦您使用內網服務MQ。

  • 內網服務無法跨域訪問。例如,您所購買的Realtime Compute服務的地區為華東1(杭州),但是購買的RocketMQ服務的地區為華東2(上海),則無法訪問。

  • 通過公網方式訪問RocketMQ,需要開通公網服務,詳情請參見網路連接選型

topic

topic名稱。

String

無。

accessId

  • 4.x:阿里雲帳號的AccessKey ID。

  • 5.x:

    RocketMQ執行個體使用者名稱

String

  • RocketMQ 4.x:是

  • RocketMQ 5.x:否

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

  • RocketMQ 5.x:如果是使用公網存取點訪問,需配置為RocketMQ控制台執行個體使用者名稱。如果是在阿里雲ECS內網訪問,無需填寫該配置。

accessKey

  • 4.x: 阿里雲帳號的AccessKey Secret。

  • 5.x:執行個體密碼

String

  • RocketMQ 4.x:是

  • RocketMQ 5.x:否

tag

訂閱或寫入的標籤

String

  • RocketMQ作為源表時,只能讀取單個tag。

  • RocketMQ作為結果表時,支援設定多個tag,以逗號(,)進行分隔。

說明

當作為結果表時,僅支援RocketMQ 4.x。RocketMQ 5.x請使用結果表屬性欄位來指定寫出訊息的 tag。

encoding

編碼格式。

String

UTF-8

無。

instanceID

RocketMQ執行個體ID。

String

  • 如果RocketMQ執行個體無獨立命名空間,則不可以使用instanceID參數。

  • 如果RocketMQ執行個體有獨立命名空間,則instanceID參數必選。

說明

僅RocketMQ 4.x支援該參數。

源表專屬

參數

說明

資料類型

是否必填

預設值

備忘

consumerGroup

Consumer組名。

String

無。

pullIntervalMs

上遊沒有資料可供消費時,source的休眠時間。

Int

單位為毫秒。

目前沒有限流機制,無法設定讀取RocketMQ的速率。

說明

僅RocketMQ 4.x支援該參數。

timeZone

時區。

String

例如,Asia/Shanghai。

startTimeMs

啟動時間點。

Long

時間戳記,單位為毫秒。

startMessageOffset

訊息開始的位移量。

Int

如果填寫該參數,則優先以startMessageOffset的位點開始載入資料。

lineDelimiter

解析Block時,行分隔字元。

String

\n

無。

fieldDelimiter

欄位分隔符號。

String

\u0001

根據MQ終端的模式,分隔字元分別為:

  • 在唯讀模式下(預設模式),分隔字元為\u0001。該模式下,分隔字元不可見。

  • 在編輯模式下,分隔字元為^A

lengthCheck

單列欄位條數檢查策略。

Int

NONE

取值如下:

  • NONE:預設值。

    • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

    • 解析出的欄位數小於定義欄位數時,跳過這行資料。

  • SKIP:解析出的欄位數和定義欄位數不同時跳過資料。

  • EXCEPTION:解析出的欄位數和定義欄位數不同時提示異常。

  • PAD:按從左至右順序填充。

    • 解析出的欄位數大於定義欄位數時,按從左至右的順序,取定義欄位數量的資料。

    • 解析出的欄位數小於定義欄位數時,在行尾用Null填充缺少的欄位。

columnErrorDebug

是否開啟調試開關。

Boolean

false

如果設定為true,則列印解析異常的Log。

pullBatchSize

每次拉取訊息的最大數量。

Int

64

僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

producerGroup

寫入的群組。

String

無。

retryTimes

寫入的重試次數。

Int

10

無。

sleepTimeMs

稍候再試時間。

Long

5000

無。

partitionField

指定欄位名,將該欄位作為分區列。

String

如果modepartition,則該參數必填。

說明

僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。

deliveryTimestampMode

指定延遲訊息的模式,該模式與deliveryTimestampValue參數共同決定延遲訊息的投遞時間。

String

取值如下:

  • fixed:固定時間戳記模式。

  • relative:相對延遲時間模式。

  • field:指定欄位作為投遞時間模式。

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

deliveryTimestampType

指定延遲訊息的時間基準類型

String

processing_time

取值如下:

  • event_time:事件時間。

  • processing_time:處理時間。

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

deliveryTimestampValue

延遲訊息的投遞時間。

Long

根據deliveryTimestampMode的取值,其含義如下:

  • deliveryTimestampMode=fixed:延遲至指定的時間戳記(毫秒級),如果目前時間超過該時間戳記,則直接投遞訊息。

  • deliveryTimestampMode=relative:基於deliveryTimestampType時間類型的延遲時間(預設單位:毫秒)。

  • deliveryTimestampMode=field:參數不生效,延遲時間由deliveryTimestampField指定的欄位值決定。

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

deliveryTimestampField

指定用作延遲訊息投遞時間的欄位。欄位類型必須為BIGINT

String

deliveryTimestampModefield時生效。

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

類型映射

Flink欄位類型

雲訊息佇列RocketMQ欄位類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

程式碼範例

源表示例

  • CSV格式

    假設您的一條CSV格式訊息記錄如下。

    1,name,male 
    2,name,female
    說明

    一條RocketMQ訊息可以包括零條到多條資料記錄,記錄之間使用\n分隔。

    Flink作業中,聲明RocketMQ資料來源表的DDL如下。

    • RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • 二進位格式

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
    • RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

結果表示例

  • 建立結果表

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      說明

      如果您的MQ訊息為二進位格式,則DDL中只能定義一個欄位,且欄位類型必須為VARBINARY。

  • 建立將keystags欄位指定為RocketMQ訊息的key和tag的結果表

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

DataStream API

重要

通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法

Realtime Compute引擎VVR提供MetaQSource,用於讀取RocketMQ;提供OutputFormat的實作類別MetaQOutputFormat,用於寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的樣本如下:

RocketMQ 5.x

說明

在RocketMQ 5.x 中,存取金鑰對應的是執行個體中配置的使用者名稱和密碼。若通過內網訪問RocketMQ執行個體,且執行個體未開啟 ACL 認證,則可不填寫AK/SK參數。

import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

RocketMQ 4.x

import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

XML

<!--MQ5-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>

<!--MQ4-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
說明

RocketMQ存取點Endpoint配置詳情請參見關於TCP內網存取點設定的公告

常見問題

RocketMQ Topic擴容時,RocketMQ如何感知Topic分區數變化?