本文為您介紹ApsaraMQ for RocketMQ連接器。
鑒於ApsaraMQ for RocketMQ 4.x標準版執行個體共用API調用彈性上限為每秒5000次,使用該版本的訊息中介軟體在與Realtime ComputeFlink版對接時,若超過上限會觸發限流機制,可能會導致Flink作業運行不穩定。因此,在選擇訊息中介軟體時,如果您正在或計劃通過標準版RocketMQ與Flink對接,請您謹慎評估。如果業務情境允許,請考慮使用Kafka、Log Service(SLS)或DataHub等其他中介軟體進行替代。如果您確實需要使用ApsaraMQ for RocketMQ 4.x標準版處理大規模的訊息,也請同時通過提交工單與RocketMQ產品取得聯絡申請提升限速上限。
背景資訊
雲訊息佇列 RocketMQ 版是阿里雲基於Apache RocketMQ構建的低延遲、高並發、高可用和高可靠的分布式訊息中介軟體。其既可為分布式應用系統提供非同步解耦和削峰填穀的能力,同時也具備互連網應用所需的海量訊息堆積、高吞吐和可靠重試等特性。
RocketMQ連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 僅支援流模式 |
資料格式 | CSV和二進位格式 |
特有監控指標 | |
API種類 | Datastream(僅支援RocketMQ 4.x)和SQL |
是否支援更新或刪除結果表資料 | 不支援更新和刪除結果表資料,只支援插入資料。 |
特色功能
RocketMQ源表和結果表支援屬性欄位,具體如下。
源表屬性欄位
欄位名
欄位類型
說明
topic
VARCHAR METADATA VIRTUAL
訊息Topic。
queue-id
INT METADATA VIRTUAL
訊息佇列ID。
queue-offset
BIGINT METADATA VIRTUAL
訊息佇列的消費位點。
msg-id
VARCHAR METADATA VIRTUAL
訊息ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
訊息儲存時間。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
訊息產生時間。
keys
VARCHAR METADATA VIRTUAL
訊息Keys。
tags
VARCHAR METADATA VIRTUAL
訊息Tags。
結果表屬性欄位
欄位名
欄位類型
說明
keys
VARCHAR METADATA
訊息Keys。
tags
VARCHAR METADATA
訊息Tags。
前提條件
已建立了RocketMQ資源,詳情請參見建立資源。
使用限制
僅FlinkRealtime Compute引擎VVR 8.0.3及以上版本支援5.x版本的RocketMQ。
RocketMQ連接器使用Pull Consumer消費,所有的子任務分擔消費。
文法結構
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | connector類型。 | String | 是 | 無 |
|
endPoint | EndPoint地址 | String | 是 | 無 | ApsaraMQ for RocketMQ接入地址支援以下兩種類型:
重要 由於阿里雲網路安全性原則動態變化,Realtime Compute串連公網服務MQ時可能會出現網路連接問題,推薦您使用內網服務MQ。
|
topic | topic名稱。 | String | 是 | 無 | 無。 |
accessId |
| String |
| 無 |
重要 為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。
|
accessKey |
| String |
| 無 | |
tag | 訂閱或寫入的標籤 | String | 否 | 無 |
說明 當作為結果表時,僅支援RocketMQ 4.x。RocketMQ 5.x請使用結果表屬性欄位來指定寫出訊息的 tag。 |
encoding | 編碼格式。 | String | 否 | UTF-8 | 無。 |
instanceID | RocketMQ執行個體ID。 | String | 否 | 無 |
說明 僅RocketMQ 4.x支援該參數。 |
源表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
consumerGroup | Consumer組名。 | String | 是 | 無 | 無。 |
pullIntervalMs | 上遊沒有資料可供消費時,source的休眠時間。 | Int | 是 | 無 | 單位為毫秒。 目前沒有限流機制,無法設定讀取RocketMQ的速率。 說明 僅RocketMQ 4.x支援該參數。 |
timeZone | 時區。 | String | 否 | 無 | 例如,Asia/Shanghai。 |
startTimeMs | 啟動時間點。 | Long | 否 | 無 | 時間戳記,單位為毫秒。 |
startMessageOffset | 訊息開始的位移量。 | Int | 否 | 無 | 如果填寫該參數,則優先以 |
lineDelimiter | 解析Block時,行分隔字元。 | String | 否 | \n | 無。 |
fieldDelimiter | 欄位分隔符號。 | String | 否 | \u0001 | 根據MQ終端的模式,分隔字元分別為:
|
lengthCheck | 單列欄位條數檢查策略。 | Int | 否 | NONE | 取值如下:
|
columnErrorDebug | 是否開啟調試開關。 | Boolean | 否 | false | 如果設定為true,則列印解析異常的Log。 |
pullBatchSize | 每次拉取訊息的最大數量。 | Int | 否 | 64 | 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
producerGroup | 寫入的群組。 | String | 是 | 無 | 無。 |
retryTimes | 寫入的重試次數。 | Int | 否 | 10 | 無。 |
sleepTimeMs | 稍候再試時間。 | Long | 否 | 5000 | 無。 |
partitionField | 指定欄位名,將該欄位作為分區列。 | String | 否 | 無 | 如果 說明 僅Realtime Compute引擎VVR 8.0.5及以上版本支援該參數。 |
deliveryTimestampMode | 指定延遲訊息的模式,該模式與 | String | 否 | 無 | 取值如下:
說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
deliveryTimestampType | 指定延遲訊息的時間基準類型。 | String | 否 | processing_time | 取值如下:
說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
deliveryTimestampValue | 延遲訊息的投遞時間。 | Long | 否 | 無 | 根據
說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
deliveryTimestampField | 指定用作延遲訊息投遞時間的欄位。欄位類型必須為 | String | 否 | 無 |
說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
類型映射
Flink欄位類型 | 雲訊息佇列RocketMQ欄位類型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
程式碼範例
源表示例
CSV格式
假設您的一條CSV格式訊息記錄如下。
1,name,male 2,name,female說明一條RocketMQ訊息可以包括零條到多條資料記錄,記錄之間使用
\n分隔。Flink作業中,聲明RocketMQ資料來源表的DDL如下。
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );二進位格式
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
結果表示例
建立結果表
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );說明如果您的MQ訊息為二進位格式,則DDL中只能定義一個欄位,且欄位類型必須為VARBINARY。
建立將
keys和tags欄位指定為RocketMQ訊息的key和tag的結果表RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Flink全託管,DataStream連接器設定方法請參見DataStream連接器使用方法。
Realtime Compute引擎VVR提供MetaQSource,用於讀取RocketMQ;提供OutputFormat的實作類別MetaQOutputFormat,用於寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的樣本如下:
RocketMQ 5.x
在RocketMQ 5.x 中,存取金鑰對應的是執行個體中配置的使用者名稱和密碼。若通過內網訪問RocketMQ執行個體,且執行個體未開啟 ACL 認證,則可不填寫AK/SK參數。
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里雲Realtime ComputeFlink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}XML
MQ 4.x:MQ DataStream連接器。
MQ 5.x:MQ DataStream連接器。
<!--MQ5-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--MQ4-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>RocketMQ存取點Endpoint配置詳情請參見關於TCP內網存取點設定的公告。