You can use the Message Queue for Apache RocketMQ DataStream connector to allow DataStream API operations to read data from or write data to Message Queue for Apache RocketMQ. This topic describes how to use the Message Queue for Apache RocketMQ DataStream connector in fully managed Flink.

Background information

The Message Queue for Apache RocketMQ DataStream connector is based on the Message Queue for Apache RocketMQ SQL connector. The connection configuration of the Message Queue for Apache RocketMQ DataStream connector is the same as the configuration connection of the Message Queue for Apache RocketMQ SQL connector. For more information about the connection configuration of the Message Queue for Apache RocketMQ SQL connector, see the "Parameters in the WITH clause" section in Create a Message Queue for Apache RocketMQ source table and Create a Message Queue for Apache RocketMQ result table.

The Message Queue for Apache RocketMQ SQL connector of Ververica Runtime (VVR) is not stored in the Maven central repository. If you want to use the connector to develop a job, submit a ticket to obtain the ververica-connector-mq JAR package. You can use the Message Queue for Apache RocketMQ DataStream connector in one of the following ways:

(Recommended) Package the connector as a project dependency into the JAR file of your job

  1. Submit a ticket to obtain the ververica-connector-mq JAR package.
  2. Store the ververica-connector-mq JAR package in the local Maven repository.
  3. Add the connector that you want to use to the Maven POM file as a project dependency.
    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <artifactId>ververica-connector-mq</artifactId>
      <version>${connector.version}</version>
    </dependency>
    Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the mappings among connector versions, VVR or Flink versions, and connector types, see DataStream connectors. For more information about the dependencies, see the POM file in the Message Queue for Apache RocketMQ sample code.
    Notice
    • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
    • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. Modify the connection configuration of Message Queue for Apache RocketMQ.
    The VVR engine of Realtime Compute for Apache Flink provides the implementation class MetaQSourceFunction of SourceFunction for reading data from Message Queue for Apache RocketMQ and the implementation class MetaQOutputFormat of OutputFormat for writing data to Message Queue for Apache RocketMQ. The following sample code shows how to use the Message Queue for Apache RocketMQ DataStream connector to read data from and write data to Message Queue for Apache RocketMQ.
    /**
     * A {@link DataStream} demo that illustrates how to consume messages from RocketMQ, convert
     * messages, then produce messages to RocketMQ.
     *
     * <pre>
     * Arguments
     * mqSourceTopic: The consumer topic of the RocketMQ source.
     * mqSourceConsumerGroup: The consumer group of the RocketMQ source.
     * mqSourceEndpoint: The endpoint address of consumer topic for the RocketMQ source.
     * mqSourceAccessId: The access id of consumer topic for the RocketMQ source.
     * mqSourceAccessKey: The access key of consumer topic for the RocketMQ source.
     * mqSourceInstanceId: The instance id of consumer topic for the RocketMQ source.
     * startMessageOffset: The starting offset of message consumption for the RocketMQ source.
     * startTime: The starting time of message consumption for the RocketMQ source.
     * mqSinkTopic: The producer topic of the RocketMQ sink.
     * mqSinkProducerGroup: The producer group of the RocketMQ sink.
     * mqSinkEndpoint: The endpoint address of producer topic for the RocketMQ sink.
     * mqSinkAccessId: The access id of producer topic for the RocketMQ sink.
     * mqSinkAccessKey: The access key of producer topic for the RocketMQ sink.
     * mqSinkInstanceId: The instance id of producer topic for the RocketMQ sink.
     * </pre>
     */
    public class RocketMQDataStreamDemo {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQDataStreamDemo.class);
    
        public static void main(String[] args) throws Exception {
            // Sets up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            final ParameterTool parameters = ParameterTool.fromArgs(args);
            // Creates and adds RocketMQ source.
            env.addSource(createRocketMQSource(parameters))
                    // Converts message body to upper case.
                    .map(RocketMQDataStreamDemo::convertMessages)
                    // Creates and adds RocketMQ sink.
                    .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat(parameters)))
                    .name(RocketMQDataStreamDemo.class.getSimpleName());
            // Compiles and submits job.
            env.execute("RocketMQ connector end-to-end DataStream demo");
        }
    
        private static MetaQSourceFunction createRocketMQSource(ParameterTool parameters) {
            String sourceTopic = parameters.get("mqSourceTopic");
            String consumerGroup = parameters.get("mqSourceConsumerGroup");
            Properties mqProperties = createSourceMQProperties(parameters);
            int partitionCount = 0;
            MetaPullConsumer consumer = null;
            try {
                consumer = createConsumerInstance(sourceTopic, consumerGroup, mqProperties, null, -1);
                Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(sourceTopic);
                partitionCount = queues == null ? 0 : queues.size();
            } catch (MQClientException e) {
                LOGGER.error(
                        "Fetches RocketMQ partition count for RocketMQ source exception [{}].",
                        e.getMessage());
            } finally {
                if (consumer != null) {
                    try {
                        MetaQConnect.shutdownConsumer(consumer);
                    } catch (Exception ignored) {
                    }
                }
            }
            return new MetaQSourceFunction(
                    sourceTopic,
                    consumerGroup,
                    null,
                    null,
                    100,
                    partitionCount,
                    Long.MAX_VALUE,
                    Long.parseLong(parameters.get("startMessageOffset")),
                    Long.parseLong(parameters.get("startTime")),
                    mqProperties);
        }
    
        private static MetaQOutputFormat createRocketMQOutputFormat(ParameterTool parameters) {
            return new MetaQOutputFormat.Builder()
                    .setTopicName(parameters.get("mqSinkTopic"))
                    .setProducerGroup(parameters.get("mqSinkProducerGroup"))
                    .setMqProperties(createSinkMQProperties(parameters))
                    .build();
        }
    
        private static Properties createSourceMQProperties(ParameterTool parameters) {
            Properties properties = new Properties();
            properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
            properties.put(NAMESRV_ADDR, parameters.get("mqSourceEndpoint"));
            properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSourceAccessId"));
            properties.put(PROPERTY_SECRETKEY, parameters.get("mqSourceAccessKey"));
            properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
            properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSourceInstanceId"));
            return properties;
        }
    
        private static Properties createSinkMQProperties(ParameterTool parameters) {
            Properties properties = new Properties();
            properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
            properties.put(NAMESRV_ADDR, parameters.get("mqSinkEndpoint"));
            properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSinkAccessId"));
            properties.put(PROPERTY_SECRETKEY, parameters.get("mqSinkAccessKey"));
            properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
            properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSinkInstanceId"));
            return properties;
        }
    
        private static List<MessageExt> convertMessages(List<MessageExt> messages) {
            messages.forEach(
                    message -> message.setBody(new String(message.getBody()).toUpperCase().getBytes()));
            return messages;
        }
    }
    The connection configuration of the Message Queue for Apache RocketMQ DataStream connector is the same as the connection configuration of the Message Queue for Apache RocketMQ SQL connector. For more information, see the "Parameters in the WITH clause" section in the following topics:
    Note For more information about the endpoints of Message Queue for Apache RocketMQ, see Announcement on the settings of internal TCP endpoints.

Upload the JAR package of the Message Queue for Apache RocketMQ DataStream connector to the console of fully managed Flink

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. Click Upload Artifact and select the JAR package that you want to upload.
    You can upload the JAR package of your self-managed connector or the JAR package of a connector that is provided by fully managed Flink.
  5. In the Additional Dependencies section of the Draft Editor page, select the JAR package that you want to use.