本文介紹如何使用輕量訊息佇列MNS連接器。
背景資訊
輕量訊息佇列(原 MNS)是一種高效、可靠、安全、便捷、可彈性擴充的分布式Message Service。MNS提供OSS事件通知能力,通過建立事件通知規則,MNS可以將Object Storage Service指定資源上產生的事件(例如新檔案被建立)以訊息的方式推送到MNS隊列中。
Flink作業可以使用MNS連接器消費這些事件,例如在即時影像處理情境,使用者可以使用MNS連接器即時擷取OSS Bucket中的新檔案的路徑,再配合Realtime ComputeFlink版提供的FETCH_CONTENT來下載圖片內容並調用AI大語言模型整合相關能力進行即時多模態分析。
類別 | 詳情 |
支援類型 | 源表 |
運行模式 | 流模式 |
資料格式 | Orc、Parquet、Avro、Csv、JSON和Raw |
特有監控指標 | duplicateMessages(重複訊息數)、deletedMessages(已刪除訊息數)、failedDeletes(刪除失敗數)、deserializationErrors(還原序列化錯誤數) |
API種類 | SQL |
前提條件
已開通輕量訊息佇列(原 MNS)並授權與Flink工作空間處於同一地區(內網訪問)。
MNS 通過公網訪問時,Flink 工作空間需開啟公網訪問,並將 Flink 的公網 IP 加入 MNS 隊列的白名單,詳情請參見MNS存取控制。
使用限制
僅Realtime Compute引擎VVR 11.6.0及以上版本支援MNS連接器。
MNS連接器與Kafka不同,不支援從指錨點消費訊息,也不支援回溯。詳情請參見輕量訊息佇列(原 MNS)。
並行度固定為1:MNS 連接器通過 Connector 側去重實現 Exactly-Once 語義,僅支援單並發消費。
必須開啟Checkpoint:MNS連接器依賴Checkpoint實現訊息的確認和刪除。如果未開啟Checkpoint,訊息將不會被刪除,導致無限重複消費。
訊息體大小限制:MNS單條訊息Body大小不能超過64 KB。如需處理更大訊息,請參見MNS的超大訊息傳輸最佳實務。
訊息可見度設定:使用者在建立MNS隊列時需設定訊息可見度(Visibility Timeout)。訊息可見度逾時時間建議大於 Checkpoint 間隔,避免訊息重複投遞。
MNS連接器不保證消費事件時的嚴格順序。MNS 普通隊列非嚴格 FIFO 隊列,消費逾時後訊息重投遞可能導致順序變化。
文法結構
CREATE TABLE mns_source (
data STRING
) WITH (
'connector' = 'mns',
'endpoint' = '${endpoint}',
'region' = '${region}',
'queueName' = '${queueName}',
'accessKeyId' = '${accessKeyId}',
'accessKeySecret' = '${accessKeySecret}',
'format' = 'json',
'batchSize' = '8',
'pollingWaitTime' = '10s',
'messageType' = 'RAW'
);WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為
mns。endpoint
MNS服務存取點
String
是
無
URI格式,例如
http://{account-id}.mns.{region}.aliyuncs.com。詳情請參見地區存取點。
region
MNS服務所在地區。
String
是
無
例如cn-hangzhou,支援的地區請參見存取點。
queueName
MNS隊列名。
String
是
無
在MNS控制台建立隊列時設定的隊列名稱。
accessKeyId
訪問MNS服務所需AK
String
是
無
請使用已有AccessKey或者參考建立AccessKey重新建立。
accessKeySecret
訪問MNS服務所需SK
String
是
無
format
資料格式。
String
是
無
參數取值如下:
csv
json
avro
parquet
orc
raw
batchSize
單次從MNS隊列拉取訊息的最大條數。
Integer
否
1
有效範圍:1-16。該參數影響讀效能,調大可增加吞吐。注意由於MNS服務介面限制,單次最多返回16條訊息。
pollingWaitTime
每次從MNS隊列拉取訊息時的最長等待時間。
Duration
否
10s
取值範圍0-30s。設為0則代表不等待。
messageType
MNS訊息體載荷類型。
String
否
RAW
取值為
RAW或OSS。RAW(預設值):將訊息體視為標準JSON,由配置的format還原序列化器進行解析。OSS:自動解析OSS事件通知格式的JSON(需要MNS服務訂閱OSS事件)。連接器會自動從events數組中提取第一個元素,並將嵌套欄位扁平化映射到表結構。說明當
messageType設為OSS時,format必須為json。deleteMaxRetries
當MNS SDK刪除訊息失敗時的最大重試次數。
Integer
否
3
無。
startTimeMs
訊息消費起始時間(Unix時間戳記,單位毫秒)。
Long
否
-1
-1代表不按時間戳記過濾事件,即會嘗試消費隊列中所有可見訊息。
說明MNS連接器不支援類似Kafka那樣的訊息回溯,此處的startTimeMs只用來過濾。MNS連接器不會將進入MNS隊列時間早於該時間戳記的訊息發送到下遊。
讀取OSS事件通知
當設定 'messageType' = 'OSS' 時,MNS連接器會自動解析OSS事件通知中的JSON欄位並映射為扁平的表結構。支援以下欄位名映射(不區分大小寫):
使用者表欄位名 | JSON路徑 | 說明 |
eventName | eventName | 事件類型。 |
eventSource | eventSource | 事件來源。 |
eventTime | eventTime | 事件產生的時間。 |
eventVersion | eventVersion | 事件協議的版本。 |
region | region | Bucket所在的地區。 |
ossBucketArn | oss.bucket.arn | Bucket的唯一識別碼。 |
ossBucketName | oss.bucket.name | Bucket的名稱。 |
ossBucketOwnerIdentity | oss.bucket.ownerIdentity | 建立Bucket的使用者ID。 |
ossObjectKey | oss.object.key | Object的名稱。 |
ossObjectSize | oss.object.size | Object的大小。 |
ossObjectETag | oss.object.eTag | Object 的 ETag 值,可用於校正內容是否變化。 |
ossObjectDeltaSize | oss.object.deltaSize | Object的大小變化量。 |
ossObjectReadFrom | oss.object.readFrom | 檔案開始讀取的位置。 |
ossObjectReadTo | oss.object.readTo | 檔案最後讀取的位置。 |
ossOssSchemaVersion | oss.ossSchemaVersion | OSS模式的版本號碼。 |
ossRuleId | oss.ruleId | 事件匹配的規則ID。 |
requestParametersSourceIPAddress | requestParameters.sourceIPAddress | 請求的源IP。 |
responseElementsRequestId | responseElements.requestId | 請求對應的Request ID。 |
userIdentityPrincipalId | userIdentity.principalId | 請求發起者的UID。 |
也可以通過JSON格式參數控制解析行為:
json.fail-on-missing-field:缺失欄位時是否報錯(預設false)json.ignore-parse-errors:是否忽略解析錯誤(預設false)json.timestamp-format.standard:時間戳記格式(預設iso-8601)json.timestamp-format.pattern:自訂時間戳記格式模式
以上欄位名詳細介紹參考OSS事件通知。
使用樣本
樣本一:消費標準JSON訊息
-- 建立MNS源表 -- 欄位名來自於訊息體的json格式 CREATE TEMPORARY TABLE mns_source ( `userId` BIGINT, `action` STRING, `timestamp` TIMESTAMP(3), `payload` STRING ) WITH ( 'connector' = 'mns', 'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com', 'region' = 'cn-hangzhou', 'queueName' = 'my-events-queue', 'accessKeyId' = 'your-ak', 'accessKeySecret' = 'your-sk', 'format' = 'json' ); -- 建立結果表用於測試輸出 CREATE TEMPORARY TABLE print_sink ( `userId` BIGINT, `action` STRING, `timestamp` TIMESTAMP(3), `payload` STRING ) WITH ( 'connector' = 'print' ); -- 消費並輸出 INSERT INTO print_sink SELECT userId, action, timestamp, payload FROM mns_source;樣本二:消費OSS事件通知訊息
-- 建立MNS源表,自動解析OSS事件通知JSON -- 欄位名定義參考“讀取OSS事件通知“ CREATE TEMPORARY TABLE oss_event_source ( `eventName` STRING, `eventSource` STRING, `eventTime` TIMESTAMP(3), `region` STRING, `ossBucketName` STRING, `ossObjectKey` STRING, `ossObjectSize` BIGINT, `responseElementsRequestId` STRING, `userIdentityPrincipalId` STRING ) WITH ( 'connector' = 'mns', 'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com', 'region' = 'cn-hangzhou', 'queueName' = 'oss-events-queue', 'accessKeyId' = '${secret_values.ak_id}', 'accessKeySecret' = '${secret_values.ak_secret}', 'format' = 'json', 'messageType' = 'OSS' ); -- 建立結果表用於測試輸出 CREATE TEMPORARY TABLE print_sink ( `eventName` STRING, `ossBucketName` STRING, `ossObjectKey` STRING, `ossObjectSize` BIGINT, `eventTime` TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); -- 過濾出ObjectCreated事件並輸出 INSERT INTO print_sink SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime FROM oss_event_source WHERE eventName LIKE 'ObjectCreated:%';