全部產品
Search
文件中心

Realtime Compute for Apache Flink:輕量訊息佇列MNS

更新時間:Apr 11, 2026

本文介紹如何使用輕量訊息佇列MNS連接器。

背景資訊

輕量訊息佇列(原 MNS)是一種高效、可靠、安全、便捷、可彈性擴充的分布式Message Service。MNS提供OSS事件通知能力,通過建立事件通知規則,MNS可以將Object Storage Service指定資源上產生的事件(例如新檔案被建立)以訊息的方式推送到MNS隊列中。

Flink作業可以使用MNS連接器消費這些事件,例如在即時影像處理情境,使用者可以使用MNS連接器即時擷取OSS Bucket中的新檔案的路徑,再配合Realtime ComputeFlink版提供的FETCH_CONTENT來下載圖片內容並調用AI大語言模型整合相關能力進行即時多模態分析。

類別

詳情

支援類型

源表

運行模式

流模式

資料格式

Orc、Parquet、Avro、Csv、JSON和Raw

特有監控指標

duplicateMessages(重複訊息數)、deletedMessages(已刪除訊息數)、failedDeletes(刪除失敗數)、deserializationErrors(還原序列化錯誤數)

API種類

SQL

前提條件

使用限制

  • 僅Realtime Compute引擎VVR 11.6.0及以上版本支援MNS連接器。

  • MNS連接器與Kafka不同,不支援從指錨點消費訊息,也不支援回溯。詳情請參見輕量訊息佇列(原 MNS)

  • 並行度固定為1:MNS 連接器通過 Connector 側去重實現 Exactly-Once 語義,僅支援單並發消費。

  • 必須開啟Checkpoint:MNS連接器依賴Checkpoint實現訊息的確認和刪除。如果未開啟Checkpoint,訊息將不會被刪除,導致無限重複消費。

  • 訊息體大小限制:MNS單條訊息Body大小不能超過64 KB。如需處理更大訊息,請參見MNS的超大訊息傳輸最佳實務。

  • 訊息可見度設定:使用者在建立MNS隊列時需設定訊息可見度(Visibility Timeout)。訊息可見度逾時時間建議大於 Checkpoint 間隔,避免訊息重複投遞。

  • MNS連接器不保證消費事件時的嚴格順序。MNS 普通隊列非嚴格 FIFO 隊列,消費逾時後訊息重投遞可能導致順序變化。

文法結構

CREATE TABLE mns_source (
    data STRING
) WITH (
    'connector' = 'mns',
    'endpoint' = '${endpoint}',
    'region' = '${region}',
    'queueName' = '${queueName}',
    'accessKeyId' = '${accessKeyId}',
    'accessKeySecret' = '${accessKeySecret}',
    'format' = 'json',
    'batchSize' = '8',
    'pollingWaitTime' = '10s',
    'messageType' = 'RAW'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為mns

    endpoint

    MNS服務存取點

    String

    URI格式,例如http://{account-id}.mns.{region}.aliyuncs.com

    詳情請參見地區存取點

    region

    MNS服務所在地區。

    String

    例如cn-hangzhou,支援的地區請參見存取點

    queueName

    MNS隊列名。

    String

    在MNS控制台建立隊列時設定的隊列名稱。

    accessKeyId

    訪問MNS服務所需AK

    String

    請使用已有AccessKey或者參考建立AccessKey重新建立。

    accessKeySecret

    訪問MNS服務所需SK

    String

    format

    資料格式。

    String

    參數取值如下:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

    batchSize

    單次從MNS隊列拉取訊息的最大條數。

    Integer

    1

    有效範圍:1-16。該參數影響讀效能,調大可增加吞吐。注意由於MNS服務介面限制,單次最多返回16條訊息。

    pollingWaitTime

    每次從MNS隊列拉取訊息時的最長等待時間。

    Duration

    10s

    取值範圍0-30s。設為0則代表不等待。

    messageType

    MNS訊息體載荷類型。

    String

    RAW

    取值為RAWOSS

    RAW(預設值):將訊息體視為標準JSON,由配置的 format 還原序列化器進行解析。

    OSS:自動解析OSS事件通知格式的JSON(需要MNS服務訂閱OSS事件)。連接器會自動從 events 數組中提取第一個元素,並將嵌套欄位扁平化映射到表結構。

    說明

    當 messageType 設為 OSS 時,format 必須為 json

    deleteMaxRetries

    當MNS SDK刪除訊息失敗時的最大重試次數。

    Integer

    3

    無。

    startTimeMs

    訊息消費起始時間(Unix時間戳記,單位毫秒)。

    Long

    -1

    -1代表不按時間戳記過濾事件,即會嘗試消費隊列中所有可見訊息。

    說明

    MNS連接器不支援類似Kafka那樣的訊息回溯,此處的startTimeMs只用來過濾。MNS連接器不會將進入MNS隊列時間早於該時間戳記的訊息發送到下遊。

讀取OSS事件通知

當設定 'messageType' = 'OSS' 時,MNS連接器會自動解析OSS事件通知中的JSON欄位並映射為扁平的表結構。支援以下欄位名映射(不區分大小寫):

使用者表欄位名

JSON路徑

說明

eventName

eventName

事件類型。

eventSource

eventSource

事件來源。

eventTime

eventTime

事件產生的時間。

eventVersion

eventVersion

事件協議的版本。

region

region

Bucket所在的地區。

ossBucketArn

oss.bucket.arn

Bucket的唯一識別碼。

ossBucketName

oss.bucket.name

Bucket的名稱。

ossBucketOwnerIdentity

oss.bucket.ownerIdentity

建立Bucket的使用者ID。

ossObjectKey

oss.object.key

Object的名稱。

ossObjectSize

oss.object.size

Object的大小。

ossObjectETag

oss.object.eTag

Object 的 ETag 值,可用於校正內容是否變化。

ossObjectDeltaSize

oss.object.deltaSize

Object的大小變化量。

ossObjectReadFrom

oss.object.readFrom

檔案開始讀取的位置。

ossObjectReadTo

oss.object.readTo

檔案最後讀取的位置。

ossOssSchemaVersion

oss.ossSchemaVersion

OSS模式的版本號碼。

ossRuleId

oss.ruleId

事件匹配的規則ID。

requestParametersSourceIPAddress

requestParameters.sourceIPAddress

請求的源IP。

responseElementsRequestId

responseElements.requestId

請求對應的Request ID。

userIdentityPrincipalId

userIdentity.principalId

請求發起者的UID。

也可以通過JSON格式參數控制解析行為:

  • json.fail-on-missing-field:缺失欄位時是否報錯(預設false)

  • json.ignore-parse-errors:是否忽略解析錯誤(預設false)

  • json.timestamp-format.standard:時間戳記格式(預設iso-8601)

  • json.timestamp-format.pattern:自訂時間戳記格式模式

以上欄位名詳細介紹參考OSS事件通知

使用樣本

  • 樣本一:消費標準JSON訊息

    -- 建立MNS源表
    -- 欄位名來自於訊息體的json格式
    CREATE TEMPORARY TABLE mns_source (
      `userId` BIGINT,
      `action` STRING,
      `timestamp` TIMESTAMP(3),
      `payload` STRING
    ) WITH (
      'connector' = 'mns',
      'endpoint' = 'http://your-account-id.mns.cn-hangzhou.aliyuncs.com',
      'region' = 'cn-hangzhou',
      'queueName' = 'my-events-queue',
      'accessKeyId' = 'your-ak',
      'accessKeySecret' = 'your-sk',
      'format' = 'json'
    );
    
    -- 建立結果表用於測試輸出
    CREATE TEMPORARY TABLE print_sink (
      `userId` BIGINT,
      `action` STRING,
      `timestamp` TIMESTAMP(3),
      `payload` STRING
    ) WITH (
      'connector' = 'print'
    );
    
    -- 消費並輸出
    INSERT INTO print_sink
    SELECT userId, action, timestamp, payload
    FROM mns_source;
  • 樣本二:消費OSS事件通知訊息

    -- 建立MNS源表,自動解析OSS事件通知JSON
    -- 欄位名定義參考“讀取OSS事件通知“
    CREATE TEMPORARY TABLE oss_event_source (
      `eventName` STRING,
      `eventSource` STRING,
      `eventTime` TIMESTAMP(3),
      `region` STRING,
      `ossBucketName` STRING,
      `ossObjectKey` STRING,
      `ossObjectSize` BIGINT,
      `responseElementsRequestId` STRING,
      `userIdentityPrincipalId` STRING
    ) WITH (
      'connector' = 'mns',
      'endpoint' = 'http://123456789.mns.cn-hangzhou.aliyuncs.com',
      'region' = 'cn-hangzhou',
      'queueName' = 'oss-events-queue',
      'accessKeyId' = '${secret_values.ak_id}',
      'accessKeySecret' = '${secret_values.ak_secret}',
      'format' = 'json',
      'messageType' = 'OSS'
    );
    
    -- 建立結果表用於測試輸出
    CREATE TEMPORARY TABLE print_sink (
      `eventName` STRING,
      `ossBucketName` STRING,
      `ossObjectKey` STRING,
      `ossObjectSize` BIGINT,
      `eventTime` TIMESTAMP(3)
    ) WITH (
      'connector' = 'print'
    );
    
    -- 過濾出ObjectCreated事件並輸出
    INSERT INTO print_sink
    SELECT eventName, ossBucketName, ossObjectKey, ossObjectSize, eventTime
    FROM oss_event_source
    WHERE eventName LIKE 'ObjectCreated:%';