本文为您介绍消息队列RocketMQ版结果表DDL定义、WITH参数和示例代码等。

说明 RocketMQ Connector可以作为Stream作业和Batch作业的结果表使用。

什么是消息队列RocketMQ版

消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。Flink支持将消息队列MQ作为流式数据的输出。

前提条件

已创建了RocketMQ资源,详情请参见创建资源

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列RocketMQ Connector。

DDL定义

create table mq_sink(
  x varchar,
  y varchar,
  z varchar
) with (
  'connector'='mq',
  'topic'='<yourTopicName>',
  'endpoint'='<yourEndpoint>',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessSecret>'
);
说明 MQ是非结构化存储格式的消息中间件,对于数据的Schema不提供强制定义,完全由业务层指定。Flink仅支持CSV和二进制格式的MQ消息。

WITH参数

参数 说明 是否必填 备注
connector 结果表类型。 固定值为mq
topic topic名称。 无。
endpoint 地址。 阿里云消息队列RocketMQ版接入地址支持以下两种类型:
  • VVR 3.0.1及以上版本的作业:需要使用TCP协议客户端接入点,详情请参见 关于TCP内网接入点设置的公告。接入点获取方式如下:
    • 内网服务MQ(阿里云经典网络/VPC)接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议客户端接入点 > 内网访问,获取对应的endPoint。
    • 公网服务MQ接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议客户端接入点 > 公网访问,获取对应的endPoint。
      说明
      • 由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。
      • 内网服务无法跨域访问。例如,您所购买的实时计算服务的地域为华东1,但是购买的消息队列MQ服务的地域为华东2,则无法访问。
      • 通过公网方式访问MQ,需要配置NAT,详情请参见创建和管理公网NAT网关实例
  • VVR 3.0.1(不含)以下版本的作业:RocketMQ旧的接入点已不可用,您需要适配升级实时计算作业。
    说明 如果您已使用了VVR 3.0.1(不含)以下版本的RocketMQ Connector,则您需要将您的实时计算作业升级至VVR 3.0.1及以上版本,并将作业中EndPoint参数取值更改为新的RocketMQ接入点,旧的RocketMQ接入点存在稳定性风险或不可用的问题,详情请参见2021年11月1日,RocketMQ旧的接入点将不可用,您需要适配升级实时计算作业。
accessId AccessKey ID。 无。
accessKey AccessKey Secret。 无。
producerGroup 写入的群组。 无。
tag 写入的标签。 支持设置多个tag,以逗号(,)进行分隔。
nameServerSubgroup NameServer组。
  • 内网服务(阿里云经典网络/VPC):nsaddr4client-internal
  • 公网服务:nsaddr4client-internet
说明 仅VVR 2.1.1 ~ VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。
encoding 编码类型。 默认值为utf-8
retryTimes 写入的重试次数。 默认值为10。
sleepTimeMs 重试间隔时间。 默认值为1000(毫秒)。
instanceID MQ实例ID。
  • 如果MQ实例无独立命名空间,则不可以使用instanceID参数。
  • 如果MQ实例有独立命名空间,则instanceID参数必选。

属性字段

字段名 字段类型 说明
keys VARCHAR METADATA RocketMQ消息Keys。
tags VARCHAR METADATA RocketMQ消息Tags。
说明 仅实时计算引擎VVR 4.0.0及以上版本支持以上RocketMQ属性字段。

代码示例

  • 创建RocketMQ结果表:
    CREATE TABLE mq_sink (
      id INTEGER,
      len BIGINT,
      content VARCHAR
    ) WITH (
      'connector'='mq',
      'endpoint'='<yourEndpoint>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessSecret>',
      'topic'='<yourTopicName>',
      'producerGroup'='<yourGroupName>',
      'tag'='<yourTagName>',
      'encoding'='utf-8',
      'retryTimes'='5',
      'sleepTimeMs'='500'
    );
    说明 如果您的MQ消息为二进制格式,则DDL中只能定义1个字段,且字段类型必须为VARBINARY。
  • 创建将keys和tags字段指定为RocketMQ消息的key和tag的结果表:
    CREATE TABLE mq_sink (
      id INTEGER,
      len BIGINT,
      content VARCHAR,
      keys VARCHAR METADATA,
      tags VARCHAR METADATA
    ) WITH (
      'connector'='mq',
      'endpoint'='<yourEndpoint>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessSecret>',
      'topic'='<yourTopicName>',
      'producerGroup'='<yourGroupName>',
      'encoding'='utf-8',
      'retryTimes'='5',
      'sleepTimeMs'='500'
    );