This topic describes the ApsaraMQ for RocketMQ connector.
ApsaraMQ for RocketMQ 4.x Standard Edition instances have a shared elastic API call limit of 5,000 per second. When you use this version of messaging middleware to connect to Realtime Compute for Apache Flink, exceeding this limit triggers a throttling mechanism, which may cause your Flink jobs to become unstable. Therefore, we recommend that you evaluate the impact of using ApsaraMQ for RocketMQ 4.x Standard Edition. If your business scenario allows, consider using other middleware, such as Kafka, Simple Log Service (SLS), or DataHub, as an alternative. If you must use ApsaraMQ for RocketMQ 4.x Standard Edition to process large-scale messages, you can also submit a ticket to contact the ApsaraMQ for RocketMQ product team and request an increase in the throttling limit.
Background information
ApsaraMQ for RocketMQ is a distributed middleware service developed by Alibaba Cloud based on Apache RocketMQ. It provides low latency, high concurrency, high availability (HA), and high reliability. It offers asynchronous decoupling and peak-load shifting for distributed applications. It also supports features required by internet applications, such as massive message accumulation, high throughput, and reliable retries.
The RocketMQ connector supports the following.
Category | Details |
Supported type | Source table and sink table |
Run mode | Only stream mode is supported. |
Data format | CSV and binary format |
Specific monitoring metrics | |
API type | Datastream (only for RocketMQ 4.x) and SQL |
Supports updating or deleting data in sink tables | Does not support updating or deleting data in sink tables. Only supports inserting data. |
Features
ApsaraMQ for RocketMQ source tables and sink tables support property fields.
Source table property fields
Field name
Field type
Description
topic
VARCHAR METADATA VIRTUAL
The message topic.
queue-id
INT METADATA VIRTUAL
The message queue ID.
queue-offset
BIGINT METADATA VIRTUAL
The consumer offset of the message queue.
msg-id
VARCHAR METADATA VIRTUAL
The message ID.
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time when the message was stored.
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
The time when the message was generated.
keys
VARCHAR METADATA VIRTUAL
The message keys.
tags
VARCHAR METADATA VIRTUAL
The message tags.
Sink table property fields
Field name
Field type
Description
keys
VARCHAR METADATA
The message keys.
tags
VARCHAR METADATA
The message tags.
Prerequisites
An ApsaraMQ for RocketMQ resource has been created. For more information, see Create a resource.
Limits
Only Ververica Runtime (VVR) 8.0.3 or later of Realtime Compute for Apache Flink supports ApsaraMQ for RocketMQ 5.x.
The ApsaraMQ for RocketMQ connector uses a pull consumer to consume messages. All subtasks share the consumption load.
Syntax
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);WITH parameters
General
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The connector type. | String | Yes | - |
|
endPoint | The endpoint address. | String | Yes | - | The ApsaraMQ for RocketMQ endpoint can be one of the following types:
Important Due to dynamic changes in Alibaba Cloud Network Security policies, network connectivity issues may occur when Realtime Compute for Apache Flink connects to a public ApsaraMQ for RocketMQ service. We recommend that you use an internal ApsaraMQ for RocketMQ service.
|
topic | The topic name. | String | Yes | None | None. |
accessId |
| String |
| None |
Important To prevent your AccessKey information from being leaked, use variables to specify the AccessKey pair. For more information, see Project variables.
|
accessKey |
| String |
| - | |
tag | The tag to subscribe to or write. | String | No | - |
Note When used as a sink table, this parameter is supported only for ApsaraMQ for RocketMQ 4.x. For ApsaraMQ for RocketMQ 5.x, use the sink table property fields to specify the tag of the output message. |
encoding | The encoding format. | String | No | UTF-8 | None. |
instanceID | The ApsaraMQ for RocketMQ instance ID. | String | No | - |
Note This parameter is supported only for ApsaraMQ for RocketMQ 4.x. |
Source table specific
Parameter | Description | Data type | Required | Default value | Remarks |
consumerGroup | The name of the consumer group. | String | Yes | - | None. |
pullIntervalMs | The hibernation duration for the source when no data is available for consumption from the upstream. | Int | Yes | None | Unit: milliseconds. No throttling mechanism is available. You cannot set a rate for reading data from ApsaraMQ for RocketMQ. Note This parameter is supported only for ApsaraMQ for RocketMQ 4.x. |
timeZone | The time zone. | String | No | - | Example: Asia/Shanghai. |
startTimeMs | The start time. | Long | No | None | A UNIX timestamp. Unit: milliseconds. |
startMessageOffset | The starting offset of messages. | Int | No | - | If this parameter is set, data loading preferentially starts from the offset of |
lineDelimiter | The row delimiter used to parse a block. | String | No | \n | None. |
fieldDelimiter | The field separator. | String | No | \u0001 | The separator varies based on the mode of the ApsaraMQ for RocketMQ client:
|
lengthCheck | The policy to check the number of fields in a single row. | Int | No | NONE | Valid values:
|
columnErrorDebug | Specifies whether to enable debugging. | Boolean | No | false | If you set this parameter to true, logs of parsing exceptions are printed. |
pullBatchSize | The maximum number of messages to pull at a time. | Int | No | 64 | This parameter is supported only in VVR 8.0.7 or later of Realtime Compute for Apache Flink. |
Sink table specific
Parameter | Description | Data type | Required | Default value | Remarks |
producerGroup | The group to write to. | String | Yes | - | None. |
retryTimes | The number of retries for a write operation. | Int | No | 10 | None. |
sleepTimeMs | The retry interval. | Long | No | 5000 | None. |
partitionField | Specifies the name of the field to use as the partition key column. | String | No | None | If Note This parameter is supported only in VVR 8.0.5 or later of Realtime Compute for Apache Flink. |
deliveryTimestampMode | Specifies the mode for delayed messages. This parameter, along with the | String | No | - | Valid values:
Note This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink. |
deliveryTimestampType | Specifies the time base type for delayed messages. | String | No | processing_time | Valid values:
Note This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink. |
deliveryTimestampValue | The delivery time for delayed messages. | Long | No | None | The meaning of this parameter varies based on the value of
Note This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink. |
deliveryTimestampField | Specifies the field to use for the delivery time of delayed messages. The field type must be | String | No | - | This parameter takes effect only when Note This parameter is supported only in VVR 11.1 or later of Realtime Compute for Apache Flink. |
Type mapping
Flink field type | ApsaraMQ for RocketMQ field type |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Code examples
Source table examples
CSV format
Assume that you have the following message record in CSV format.
1,name,male 2,name,femaleNoteA RocketMQ message can contain zero or more data records, separated by
\n.The following Data Definition Language (DDL) statement shows how to declare an ApsaraMQ for RocketMQ source table in a Flink job.
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );Binary format
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;ApsaraMQ for RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
Sink table examples
Create a sink table
ApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );ApsaraMQ for RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );NoteIf your ApsaraMQ for RocketMQ message is in binary format, you can define only one field in the DDL statement, and the field type must be VARBINARY.
Create a sink table that specifies the
keysandtagsfields as the key and tag for RocketMQ messagesApsaraMQ for RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
When you read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to fully managed Flink. For more information about how to configure the DataStream connector, see Use a DataStream connector.
Ververica Runtime (VVR) provides MetaQSource to read data from ApsaraMQ for RocketMQ. It also provides MetaQOutputFormat, which is an implementation of the OutputFormat class, to write data to ApsaraMQ for RocketMQ. The following code provides examples on how to read data from and write data to ApsaraMQ for RocketMQ:
ApsaraMQ for RocketMQ 5.x
In ApsaraMQ for RocketMQ 5.x, the AccessKey pair corresponds to the username and password that are configured for the instance. If you access the ApsaraMQ for RocketMQ instance over an internal network and Access Control List (ACL) authentication is not enabled for the instance, you do not need to configure the AccessKey pair parameters.
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}ApsaraMQ for RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// The following two configurations are for local debugging only. Delete them before you package the job and upload it to Realtime Compute for Apache Flink.
conf.setString("pipeline.classpaths", "file://" + "the absolute path of the uber JAR file");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}XML
MQ 4.x: MQ DataStream connector.
MQ 5.x: MQ DataStream connector.
<!--MQ 5.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--MQ 4.x-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>For more information about how to configure the endpoint for ApsaraMQ for RocketMQ, see Announcement on the settings of TCP internal endpoints.