All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ connector

Last Updated:Feb 06, 2024

This topic describes how to use the ApsaraMQ for RocketMQ connector.

Important

ApsaraMQ for RocketMQ 4.x Standard Edition can process up to 5,000 messages per second. If you use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink, deployments of Realtime Compute for Apache Flink may be unstable. Therefore, we recommend that you evaluate the impact if you want to use or plan to use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink. We recommend that you use other messaging middleware, such as Kafka, Simple Log Service, or DataHub, instead of ApsaraMQ for RocketMQ 4.x Standard Edition based on your business requirements. If you want to use ApsaraMQ for RocketMQ 4.x Standard Edition to process a large number of messages based on your business requirements, submit a ticket to contact technical support to increase the number of messages that can be processed by ApsaraMQ for RocketMQ 4.x Standard Edition per second.

Background information

ApsaraMQ for RocketMQ is a distributed messaging middleware developed by Alibaba Cloud based on Apache RocketMQ. The service provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. The service also supports features for Internet applications, including massive message accumulation, high throughput, and reliable retry.

The following table describes the capabilities supported by the ApsaraMQ for RocketMQ connector.

Item

Description

Table type

Source table and result table.

Running mode

Streaming mode.

Data format

CSV and binary formats.

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • Metrics for result tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

DataStream API and SQL API. Only ApsaraMQ for RocketMQ 4.x instances support DataStream APIs.

Data update or deletion in a result table

Data cannot be deleted from a result table. Data can only be updated or inserted into a result table.

Features

The following tables describe the fields that are supported for an ApsaraMQ for RocketMQ source table and an ApsaraMQ for RocketMQ result table.

  • Fields in an ApsaraMQ for RocketMQ source table

    Note

    Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports the following fields of ApsaraMQ for RocketMQ.

    Field

    Data type

    Description

    topic

    VARCHAR METADATA VIRTUAL

    The topic for a type of ApsaraMQ for RocketMQ messages.

    queue-id

    INT METADATA VIRTUAL

    The ID of the queue in which an ApsaraMQ for RocketMQ message is placed.

    queue-offset

    BIGINT METADATA VIRTUAL

    The consumption offset of an ApsaraMQ for RocketMQ message.

    msg-id

    VARCHAR METADATA VIRTUAL

    The ID of an ApsaraMQ for RocketMQ message.

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time at which an ApsaraMQ for RocketMQ message is stored.

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time at which an ApsaraMQ for RocketMQ message is generated.

    keys

    VARCHAR METADATA VIRTUAL

    The keys of an ApsaraMQ for RocketMQ message.

    tags

    VARCHAR METADATA VIRTUAL

    The tags of an ApsaraMQ for RocketMQ message.

  • Fields in an ApsaraMQ for RocketMQ result table

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.0 or later supports the following fields of ApsaraMQ for RocketMQ.

    Field

    Data type

    Description

    keys

    VARCHAR METADATA

    The keys of an ApsaraMQ for RocketMQ message.

    tags

    VARCHAR METADATA

    The tags of an ApsaraMQ for RocketMQ message.

Prerequisites

Resources are created in the ApsaraMQ for RocketMQ console. For more information, see Create a resource.

Limits

  • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the ApsaraMQ for RocketMQ connector.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports ApsaraMQ for RocketMQ 5.x instances.

  • In Realtime Compute for Apache Flink that uses the VVR version earlier than 6.0.2, the deployment parallelism for an ApsaraMQ for RocketMQ source table must be less than or equal to the number of partitions in a topic of ApsaraMQ for RocketMQ. This limit is removed from Realtime Compute for Apache Flink that uses VVR 6.0.2 or later. You can set the deployment parallelism to a value that is greater than the number of partitions in the topic of ApsaraMQ for RocketMQ in advance. You do not need to manually adjust the deployment parallelism when an ApsaraMQ for RocketMQ instance is scaled in.

  • The ApsaraMQ for RocketMQ connector uses pull consumers to consume messages. All subtasks share data consumption.

Syntax

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the connector.

    STRING

    Yes

    No default value

    • ApsaraMQ for RocketMQ 4.x: Set the value to mq.

    • ApsaraMQ for RocketMQ 5.x: Set the value to mq5.

    endPoint

    The endpoint of ApsaraMQ for RocketMQ.

    STRING

    Yes

    No default value

    ApsaraMQ for RocketMQ supports the following types of endpoints:

    • For deployments that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. To obtain the required endpoint, perform the following steps:

      • Internal endpoints of ApsaraMQ for RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.

      • Public endpoint of ApsaraMQ for RocketMQ: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the page that appears, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Endpoints tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.

      Important

      Due to changes in the network security policies of Alibaba Cloud, connection issues may occur when Realtime Compute for Apache Flink connects to the public ApsaraMQ for RocketMQ service. We recommend that you use the internal ApsaraMQ for RocketMQ service.

      • The internal ApsaraMQ for RocketMQ service does not support cross-origin access. For example, if your Realtime Compute for Apache Flink service resides in the China (Hangzhou) region but your ApsaraMQ for RocketMQ service resides in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access ApsaraMQ for RocketMQ.

      • If you want to access ApsaraMQ for RocketMQ over the Internet, you must configure network address translation (NAT). For more information, see Create and manage Internet NAT gateways.

    • For Realtime Compute for Apache Flink deployments that run on VVR of a version earlier than 3.0.1, the old endpoint of ApsaraMQ for RocketMQ is no longer available. You must update your deployments to adapt to the VVR version.

      Important

      If you use the ApsaraMQ for RocketMQ connector whose VVR version is earlier than 3.0.1, you must update your Realtime Compute for Apache Flink deployments to VVR 3.0.1 or later and change the value of the endPoint parameter to the new endpoint of ApsaraMQ for RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of ApsaraMQ for RocketMQ. For more information, see November 1, 2021: The endpoints of ApsaraMQ for RocketMQ are unavailable and Realtime Compute for Apache Flink deployments need to be updated to adapt to the change.

    topic

    The name of the topic.

    STRING

    Yes

    No default value

    N/A.

    accessId

    • ApsaraMQ for RocketMQ 4.x: the AccessKey ID of your Alibaba Cloud account.

    • ApsaraMQ for RocketMQ 5.x: the username of the ApsaraMQ for RocketMQ instance.

    STRING

    • ApsaraMQ for RocketMQ 4.x: yes

    • ApsaraMQ for RocketMQ 5.x: no

    No default value

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    • ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the username of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an Elastic Compute Service (ECS) instance over an internal network, you do not need to configure this parameter.

    accessKey

    • ApsaraMQ for RocketMQ 4.x: the AccessKey secret of your Alibaba Cloud account.

    • ApsaraMQ for RocketMQ 5.x: the password of the ApsaraMQ for RocketMQ instance.

    STRING

    • ApsaraMQ for RocketMQ 4.x: yes

    • ApsaraMQ for RocketMQ 5.x: no

    No default value

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    • ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the password of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an ECS instance over an internal network, you do not need to configure this parameter.

    tag

    The tag of the message that is subscribed to or written.

    STRING

    No

    No default value

    • For an ApsaraMQ for RocketMQ source table, only a single tag can be read.

    • For an ApsaraMQ for RocketMQ result table, you can configure multiple tags. Separate multiple tags with commas (,).

    Note

    For an ApsaraMQ for RocketMQ result table, only ApsaraMQ for RocketMQ 4.x instances are supported. If you use an ApsaraMQ for RocketMQ 5.x instance, you can specify the tag of a message by using the fields in an ApsaraMQ for RocketMQ result table.

    nameServerSubgroup

    The name server group.

    STRING

    No

    No default value

    • For the internal ApsaraMQ for RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud, you must set this parameter to nsaddr4client-internal.

    • For the public ApsaraMQ for RocketMQ service, you do not need to configure this parameter.

    Note

    This parameter is supported only in VVR versions from 2.1.1 to 3.0.0. In VVR 3.0.1 or later, this parameter is not supported.

    encoding

    The encoding format.

    STRING

    No

    UTF-8

    N/A.

    instanceID

    The ID of the ApsaraMQ for RocketMQ instance.

    STRING

    No

    No default value

    • If the ApsaraMQ for RocketMQ instance does not use a separate namespace, the instanceID parameter cannot be configured.

    • If the ApsaraMQ for RocketMQ instance uses a separate namespace, you must configure the instanceID parameter.

    Note

    Only ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to configure this parameter.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    consumerGroup

    The name of the consumer group.

    STRING

    Yes

    No default value

    N/A.

    pullIntervalMs

    The interval at which data is pulled.

    INT

    Yes

    No default value

    Unit: milliseconds.

    Note

    Only ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to configure this parameter.

    timeZone

    The time zone of the instance.

    STRING

    No

    No default value

    Example: Asia or Shanghai.

    startTimeMs

    The time to start reading data.

    LONG

    No

    No default value

    The timestamp. Unit: milliseconds.

    startMessageOffset

    The start offset to consume messages.

    INT

    No

    No default value

    This parameter is optional. If you configure this parameter, messages are consumed from the time specified by this parameter.

    lineDelimiter

    The row delimiter used when a message block is parsed.

    STRING

    No

    \n

    N/A.

    fieldDelimiter

    The field delimiter.

    STRING

    No

    \u0001

    The delimiter varies based on the mode in which the terminal of ApsaraMQ for RocketMQ works.

    • In read-only mode, the delimiter is \u0001. This is the default mode. In this mode, the delimiter is not visible.

    • In edit mode, the delimiter is ^A.

    lengthCheck

    The rule for checking the number of fields parsed from a row of data.

    INT

    No

    NONE

    Valid values:

    • NONE: This is the default value.

      • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.

      • If the number of fields that are parsed from a row of data is less than the specified number of fields, this row of data is skipped.

    • SKIP: If the number of fields that are parsed from a row of data does not match the number of specified fields, this row of data is skipped.

    • EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.

    • PAD: Data is padded from left to right based on the order of specified fields.

      • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.

      • If the number of fields parsed from a row of data is less than the number of defined fields, the values of the missing fields are padded with null.

    Note

    SKIP, EXCEPTION, and PAD are optional values.

    columnErrorDebug

    Specifies whether to enable debugging.

    BOOLEAN

    No

    false

    If you set this parameter to true, a log entry that indicates a parsing exception is returned.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    producerGroup

    The name of the producer group.

    STRING

    Yes

    No default value

    N/A.

    retryTimes

    The number of retries for writing data to the table.

    INT

    No

    10

    N/A.

    sleepTimeMs

    The retry interval.

    LONG

    No

    5000

    N/A.

    partitionField

    The partition key column. You can specify a field as a partition key column.

    STRING

    No

    No default value

    This parameter is required if the mode parameter is set to partition.

Data type mappings

Data type of Flink

Data type of ApsaraMQ for RocketMQ

VARCHAR

STRING

Sample code

  • Source table

    • CSV format

      For example, a CSV message is recorded in the following format:

      1,name,male 
      2,name,female
      Note

      An ApsaraMQ for RocketMQ message can contain zero to more data records. Multiple data records are separated by \n.

      You can use the following DDL statement to declare an ApsaraMQ for RocketMQ source table in a Flink deployment:

      • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • ApsaraMQ for RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • Binary format

      • ApsaraMQ for RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • ApsaraMQ for RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • Result table

    • Create an ApsaraMQ for RocketMQ result table.

      • ApsaraMQ for RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • ApsaraMQ for RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        Note

        If the messages of your ApsaraMQ for RocketMQ instance are in the binary format, only one field can be defined in the DDL statement and the field must be of the VARBINARY data type.

    • Create an ApsaraMQ for RocketMQ result table in which the keys and tags fields are defined as the keys and tags of ApsaraMQ for RocketMQ messages.

      • ApsaraMQ for RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • ApsaraMQ for RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

Important
  • Only ApsaraMQ for RocketMQ 4.x instances support DataStream APIs.

  • If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.

The VVR engine of Realtime Compute for Apache Flink provides MetaQSource for reading data from ApsaraMQ for RocketMQ and the implementation class MetaQOutputFormat of OutputFormat for writing data to ApsaraMQ for RocketMQ. The following sample code shows how to use the ApsaraMQ for RocketMQ DataStream connector to read data from and write data to ApsaraMQ for RocketMQ.

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo2 {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration config = new Configuration();
      	// This is only required for local testing
        config.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}
Note

For more information about the endpoints of ApsaraMQ for RocketMQ, see Announcement on the settings of internal TCP endpoints.

FAQ

How does the connector for the ApsaraMQ for RocketMQ source table learn the change in the number of partitions in a topic when new partitions are created in the topic?