All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraMQ for RocketMQ connector

Last Updated:Sep 11, 2024

This topic describes how to use the ApsaraMQ for RocketMQ connector.

Important

You can call API operations up to 5,000 times per second to access an ApsaraMQ for RocketMQ 4.x Standard Edition instance. If the upper limit is exceeded when you use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink, throttling is triggered and Flink jobs may be unstable. Therefore, we recommend that you evaluate the impact if you are using or want to use ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware to connect to Realtime Compute for Apache Flink. We recommend that you use another messaging middleware service, such as Kafka, Simple Log Service, or DataHub, instead of ApsaraMQ for RocketMQ 4.x Standard Edition as the messaging middleware based on your business requirements. If you want to use ApsaraMQ for RocketMQ 4.x Standard Edition to process a large number of messages based on your business requirements, submit a ticket to contact technical support to increase the number of messages that can be processed by ApsaraMQ for RocketMQ 4.x Standard Edition per second.

Background information

ApsaraMQ for RocketMQ is a distributed messaging middleware service developed by Alibaba Cloud based on Apache RocketMQ. The service provides low latency, high concurrency, high availability, and high reliability. ApsaraMQ for RocketMQ provides the asynchronous decoupling and load shifting features for distributed application systems. The service also supports features for Internet applications, including message accumulation, high throughput, and reliable retry.

The following table describes the capabilities that are supported by the ApsaraMQ for RocketMQ connector.

Item

Description

Table type

Source table and result table.

Running mode

Streaming mode only.

Data format

CSV and binary formats.

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • Metrics for result tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metrics.

API type

DataStream API and SQL API. Only ApsaraMQ for RocketMQ 4.x instances support DataStream APIs.

Data update or deletion in a result table

Data cannot be deleted from a result table. Data can only be updated in or inserted into a result table.

Features

The following tables describe the fields that are supported for an ApsaraMQ for RocketMQ source table and an ApsaraMQ for RocketMQ result table.

  • Fields in an ApsaraMQ for RocketMQ source table

    Note

    Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports the following fields of ApsaraMQ for RocketMQ.

    Field

    Data type

    Description

    topic

    VARCHAR METADATA VIRTUAL

    The topic for a type of ApsaraMQ for RocketMQ messages.

    queue-id

    INT METADATA VIRTUAL

    The ID of the queue in which an ApsaraMQ for RocketMQ message is stored.

    queue-offset

    BIGINT METADATA VIRTUAL

    The consumption offset of an ApsaraMQ for RocketMQ message.

    msg-id

    VARCHAR METADATA VIRTUAL

    The ID of an ApsaraMQ for RocketMQ message.

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time at which an ApsaraMQ for RocketMQ message is stored.

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    The time at which an ApsaraMQ for RocketMQ message is generated.

    keys

    VARCHAR METADATA VIRTUAL

    The keys of an ApsaraMQ for RocketMQ message.

    tags

    VARCHAR METADATA VIRTUAL

    The tags of an ApsaraMQ for RocketMQ message.

  • Fields in an ApsaraMQ for RocketMQ result table

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.0 or later supports the following fields of ApsaraMQ for RocketMQ.

    Field

    Data type

    Description

    keys

    VARCHAR METADATA

    The keys of an ApsaraMQ for RocketMQ message.

    tags

    VARCHAR METADATA

    The tags of an ApsaraMQ for RocketMQ message.

Prerequisites

Resources are created in the ApsaraMQ for RocketMQ console. For more information, see Create resources.

Limits

  • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the ApsaraMQ for RocketMQ connector.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports ApsaraMQ for RocketMQ 5.x instances.

  • In Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.2, the deployment parallelism for an ApsaraMQ for RocketMQ source table must be less than or equal to the number of partitions in a topic of ApsaraMQ for RocketMQ. This limit is removed from Realtime Compute for Apache Flink that uses VVR 6.0.2 or later. You can set the deployment parallelism to a value that is greater than the number of partitions in the topic of ApsaraMQ for RocketMQ in advance. You do not need to manually adjust the deployment parallelism when an ApsaraMQ for RocketMQ instance is scaled in.

  • The ApsaraMQ for RocketMQ connector uses pull consumers to consume messages. All subtasks share data consumption.

Syntax

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the connector.

    STRING

    Yes

    No default value

    • In ApsaraMQ for RocketMQ 4.x, set the value to mq.

    • In ApsaraMQ for RocketMQ 5.x, set the value to mq5.

    endPoint

    The endpoint of ApsaraMQ for RocketMQ.

    STRING

    Yes

    No default value

    ApsaraMQ for RocketMQ supports the following types of endpoints:

    • For deployments that run on VVR 3.0.1 or later, use Transmission Control Protocol (TCP) endpoints. To obtain the required endpoint, perform the following steps:

      • Internal endpoint of ApsaraMQ for RocketMQ that resides in the classic network or a virtual private cloud (VPC) of Alibaba Cloud: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, find the required instance and click Details in the Actions column. On the Instance Details page, click the Basic Information tab. You can view the endpoint that corresponds to Internal Access in the TCP Endpoint section.

      • Public endpoint of ApsaraMQ for RocketMQ: Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, find the required instance, and click Details in the Actions column. On the Instance Details page, click the Basic Information tab. You can view the endpoint that corresponds to Internet Access in the TCP Endpoint section.

      Important

      Due to changes in the network security policies of Alibaba Cloud, connection issues may occur when Realtime Compute for Apache Flink connects to the public ApsaraMQ for RocketMQ service. We recommend that you use the internal ApsaraMQ for RocketMQ service.

      • The ApsaraMQ for RocketMQ service that resides in the internal network does not support cross-origin access. For example, if your Realtime Compute for Apache Flink service resides in the China (Hangzhou) region but your ApsaraMQ for RocketMQ service resides in the China (Shanghai) region, Realtime Compute for Apache Flink cannot access ApsaraMQ for RocketMQ.

      • If you want to access ApsaraMQ for RocketMQ over the Internet, you must configure network address translation (NAT). For more information, see Create and manage an Internet NAT gateway.

    • For Realtime Compute for Apache Flink deployments that run on VVR of a version earlier than 3.0.1, the old endpoint of ApsaraMQ for RocketMQ is no longer available. You must update your deployments to adapt to the VVR version.

      Important

      If you use the ApsaraMQ for RocketMQ connector whose VVR version is earlier than 3.0.1, you must update your Realtime Compute for Apache Flink deployments to VVR 3.0.1 or later and change the value of the endPoint parameter to the new endpoint of ApsaraMQ for RocketMQ. This reduces the risks of instability or unavailability that are caused by the old endpoint of ApsaraMQ for RocketMQ. For more information, see Service notices of Realtime Compute for Apache Flink.

    topic

    The name of the topic.

    STRING

    Yes

    No default value

    N/A

    accessId

    • ApsaraMQ for RocketMQ 4.x: the AccessKey ID of your Alibaba Cloud account.

    • ApsaraMQ for RocketMQ 5.x: the username of the ApsaraMQ for RocketMQ instance.

    STRING

    • ApsaraMQ for RocketMQ 4.x: yes

    • ApsaraMQ for RocketMQ 5.x: no

    No default value

    • ApsaraMQ for RocketMQ 4.x: For more information about how to obtain the AccessKey ID of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.

    • ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the username of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an Elastic Compute Service (ECS) instance over an internal network, you do not need to set this parameter.

    accessKey

    • ApsaraMQ for RocketMQ 4.x: the AccessKey secret of your Alibaba Cloud account.

    • ApsaraMQ for RocketMQ 5.x: the password of the ApsaraMQ for RocketMQ instance.

    STRING

    • ApsaraMQ for RocketMQ 4.x: yes

    • ApsaraMQ for RocketMQ 5.x: no

    No default value

    • ApsaraMQ for RocketMQ 4.x: For more information about how to obtain the AccessKey ID of your Alibaba Cloud account, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.

    • ApsaraMQ for RocketMQ 5.x: If you want to access an ApsaraMQ for RocketMQ instance over the Internet, set this parameter to the password of the ApsaraMQ for RocketMQ instance in the ApsaraMQ for RocketMQ console. If you want to access an ApsaraMQ for RocketMQ instance whose client is deployed on an ECS instance over an internal network, you do not need to set this parameter.

    tag

    The tag of the message that is subscribed to or written.

    STRING

    No

    No default value

    • For an ApsaraMQ for RocketMQ source table, only a single tag can be read.

    • For an ApsaraMQ for RocketMQ result table, you can configure multiple tags. Separate multiple tags with commas (,).

    Note

    For an ApsaraMQ for RocketMQ result table, only ApsaraMQ for RocketMQ 4.x instances are supported. If you use an ApsaraMQ for RocketMQ 5.x instance, you can specify the tag of a message by using the fields in an ApsaraMQ for RocketMQ result table.

    nameServerSubgroup

    The name server group.

    STRING

    No

    No default value

    • For the ApsaraMQ for RocketMQ service that resides in the classic network or a VPC of Alibaba Cloud, you must set this parameter to nsaddr4client-internal.

    • For the ApsaraMQ for RocketMQ service that can be accessed over the Internet, you do not need to configure this parameter.

    Note

    This parameter is supported only in VVR whose version is in the range of 2.1.1 to 3.0.0. In VVR 3.0.1 or later, this parameter is not supported.

    encoding

    The encoding format.

    STRING

    No

    UTF-8

    N/A

    instanceID

    The ID of the ApsaraMQ for RocketMQ instance.

    STRING

    No

    No default value

    • If the ApsaraMQ for RocketMQ instance does not use a separate namespace, the instanceID parameter cannot be configured.

    • If the ApsaraMQ for RocketMQ instance uses a separate namespace, you must configure the instanceID parameter.

    Note

    Only ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.

  • Parameters that are exclusive to source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    consumerGroup

    The name of the consumer group.

    STRING

    Yes

    No default value

    N/A

    pullIntervalMs

    The wait interval of the source if no data can be consumed from the upstream storage system.

    INT

    Yes

    No default value

    Unit: milliseconds.

    You cannot configure the rate at which ApsaraMQ for RocketMQ messages are read because no throttling mechanism is available.

    Note

    Only ApsaraMQ for RocketMQ 4.x instances support this parameter. If you use an ApsaraMQ for RocketMQ 5.x instance, you do not need to set this parameter.

    timeZone

    The time zone of the instance.

    STRING

    No

    No default value

    Example: Asia or Shanghai.

    startTimeMs

    The time to start reading data.

    LONG

    No

    No default value

    The timestamp. Unit: milliseconds.

    startMessageOffset

    The start offset to consume messages.

    INT

    No

    No default value

    This parameter is optional. If you set this parameter, messages are consumed from the time specified by this parameter.

    lineDelimiter

    The row delimiter used when a message block is parsed.

    STRING

    No

    \n

    N/A

    fieldDelimiter

    The field delimiter.

    STRING

    No

    \u0001

    The delimiter varies based on the mode in which the terminal of ApsaraMQ for RocketMQ works.

    • In read-only mode, the delimiter is \u0001. This is the default mode. In this mode, the delimiter is invisible.

    • In edit mode, the delimiter is ^A.

    lengthCheck

    The rule for checking the number of fields parsed from a row of data.

    INT

    No

    NONE

    Valid values:

    • NONE: This is the default value.

      • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.

      • If the number of fields that are parsed from a row of data is smaller than the specified number of fields, this row of data is skipped.

    • SKIP: If the number of fields that are parsed from a row of data does not match the number of specified fields, this row of data is skipped.

    • EXCEPTION: If the number of fields that are parsed from a row of data is different from the specified number of fields, an exception is reported.

    • PAD: Data is padded from left to right based on the order of specified fields.

      • If the number of fields that are parsed from a row of data is greater than the specified number of fields, data is extracted from left to right based on the order of specified fields.

      • If the number of fields parsed from a row of data is smaller than the number of defined fields, the values of the missing fields are padded with null.

    Note

    SKIP, EXCEPTION, and PAD are optional values.

    columnErrorDebug

    Specifies whether to enable debugging.

    BOOLEAN

    No

    false

    If you set this parameter to true, a log entry that indicates a parsing exception is returned.

    pullBatchSize

    The maximum number of messages that can be pulled at a time.

    INT

    No

    64

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.

  • Parameters that are exclusive to result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    producerGroup

    The name of the producer group.

    STRING

    Yes

    No default value

    N/A

    retryTimes

    The number of retries for writing data to the table.

    INT

    No

    10

    N/A

    sleepTimeMs

    The retry interval.

    LONG

    No

    5000

    N/A

    partitionField

    The partition key column. You can specify a field as a partition key column.

    STRING

    No

    No default value

    This parameter is required if the mode parameter is set to partition.

Data type mappings

Data type of Realtime Compute for Apache Flink

Data type of ApsaraMQ for RocketMQ

VARCHAR

STRING

Examples

  • Source table

    • CSV format

      For example, a CSV message is recorded in the following format:

      1,name,male 
      2,name,female
      Note

      An ApsaraMQ for RocketMQ message can contain zero to more data records. Multiple data records are separated by \n.

      You can execute the following DDL statement to declare an ApsaraMQ for RocketMQ source table in a Realtime Compute for Apache Flink deployment:

      • ApsaraMQ for RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • ApsaraMQ for RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • Binary format

      • ApsaraMQ for RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • ApsaraMQ for RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • Result table

    • Create an ApsaraMQ for RocketMQ result table.

      • ApsaraMQ for RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • ApsaraMQ for RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        Note

        If the messages of your ApsaraMQ for RocketMQ instance are in the binary format, only one field can be defined in the DDL statement and the field must be of the VARBINARY data type.

    • Create an ApsaraMQ for RocketMQ result table in which the values of the keys and tags fields are defined as the keys and tags of ApsaraMQ for RocketMQ messages.

      • ApsaraMQ for RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • ApsaraMQ for RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors.

The VVR engine of Realtime Compute for Apache Flink provides MetaQSource for reading data from ApsaraMQ for RocketMQ and the implementation class MetaQOutputFormat of OutputFormat for writing data to ApsaraMQ for RocketMQ. The following sample code uses the ApsaraMQ for RocketMQ DataStream connector to read data from and write data to ApsaraMQ for RocketMQ.

RocketMQ 4.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are used only for local debugging. You need to delete the two configurations before you package the draft and upload it to Alibaba Cloud Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "Absolute path of the uber JAR package.
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

RocketMQ 5.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // The following two configurations are used only for local debugging. You need to delete the two configurations before you package the draft and upload it to Alibaba Cloud Realtime Compute for Apache Flink.
        conf.setString("pipeline.classpaths", "file://" + "Absolute path of the uber JAR package.
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}
Note

For more information about the endpoints of ApsaraMQ for RocketMQ, see Announcement on the settings of internal TCP endpoints.

FAQ

How does the connector for the ApsaraMQ for RocketMQ source table learn the change in the number of partitions in a topic when new partitions are created in the topic?