全部产品
Search
文档中心

实时计算Flink版:基于MNS和AI Function的实时图片分类与信息提取

更新时间:Apr 29, 2026

本文介绍如何通过轻量消息队列MNS连接器订阅OSS事件通知,结合AI Function对新上传的图片进行实时分类和结构化信息提取。

方案

在实时图像处理场景中,用户将图片上传到对象存储OSS后,需要自动完成以下处理:

  1. 实时感知新文件上传事件。

  2. 下载图片内容,调用大语言模型进行分类。

  3. 根据分类结果提取结构化信息。

image

该场景通过以下组件协同实现:

  • 轻量消息队列MNS:订阅OSS Bucket的对象创建事件,将事件消息投递到MNS队列。

  • MNS连接器:Flink作业通过MNS连接器实时消费队列中的事件,获取新上传文件的路径。

  • AI Function:调用阿里云百炼平台的多模态大模型,使用AI_CLASSIFY对图片分类,使用AI_EXTRACT提取结构化信息。

本文以电商商品图片处理为例,上传一张商品图片到OSS后,Flink作业自动识别商品类别(服装或电子产品),根据分类结果提取不同的结构化字段,并分别写入对应的结果表。

前提条件

  • 已开通实时计算Flink版并创建工作空间,详情请参见开通实时计算Flink版。实时计算引擎版本为VVR 11.6.0及以上。

  • 已开通开通轻量消息队列,且与Flink工作空间处于同一地域。

  • 已创建OSS Bucket,且与MNS队列处于同一地域。

  • 已开通阿里云百炼并获取API Key,详情请参见获取API Key

  • 已为Flink工作空间配置网络访问:

    • 访问百炼平台:测试阶段建议开启公网访问,配置简单,详情请参见网络连接选型。私网访问配置较复杂,详情请参见模型设置中的网络配置说明。

      公网访问配置后可以进行网络探测,Host:dashscope.aliyuncs.com,Prot:443。
    • 访问MNS服务:通过内网Endpoint访问无需额外配置;通过公网访问时,需开启公网访问并配置白名单,详情请参见访问控制

使用限制

  • MNS连接器仅支持源表,运行模式为流模式。

  • MNS连接器并行度固定为1,必须开启Checkpoint。详情请参见轻量消息队列MNS

  • AI_CLASSIFY和AI_EXTRACT函数的吞吐量受百炼平台限流限制。触及流量上限时,Flink作业可能出现反压或超时重启。详情请参见百炼平台限流

步骤一:创建MNS队列并配置OSS事件通知

1. 创建MNS队列

  1. 登录轻量消息队列(原 MNS)控制台

  2. 在左侧导航栏,选择队列模型 > 队列列表

  3. 在顶部菜单栏选择目标地域。

  4. 单击创建队列,配置队列参数后单击确定

主要配置项说明:

参数

说明

示例值

名称

队列名称,例如flink-mns-queue

flink-mns-queue

消息最大长度

发送到队列的消息体最大长度。

64(默认值)

消息可见性超时时间

建议大于Flink Checkpoint间隔,避免消息重复投递。

300(Flink Checkpoint间隔默认值为180s)

消息保存时长

消息在队列中的最长存活时间。

4(默认值)

2. 添加OSS事件通知规则

  1. 在MNS控制台左侧导航栏,单击事件通知

  2. 事件通知页面,单击对象存储OSS页签,然后单击创建规则

  3. 创建规则-对象存储OSS面板,配置以下参数后单击确定

参数

说明

名称

规则名称。

事件类型

选择针对创建文件的操作事件(如ObjectCreated:*)。

匹配规则

设置需要监听的OSS资源匹配规则。例如设置后缀为.png,仅监听PNG图片上传事件。

接收终端

选择一对一订阅,输入上一步创建的队列名称。

详情请参见OSS事件通知

3. 记录连接信息

队列列表页面,单击目标队列名称进入详情页,记录以下信息供后续Flink作业使用:

  • Endpoint:MNS队列的访问地址。与Flink处于同一地域时使用内网地址,格式为http://<account-id>.mns.<region>-internal.aliyuncs.com

  • 队列名称:创建队列时设置的名称。

步骤二:创建并启动Flink SQL作业

  1. 登录实时计算控制台

  2. 创建SQL流作业草稿,详情请参见Flink SQL作业

  3. 在SQL编辑器中输入以下SQL。

    本文以商品图片分类为例,实际业务中可根据场景替换分类类别和提取字段。例如:

    • 发票识别:将LABELS改为ARRAY['增值税专用发票', '增值税普通发票'],提取字段改为{"发票号码":"string","开票日期":"string","金额":"string","税额":"string"}。

    • 订单识别:将LABELS改为ARRAY['采购订单', '销售订单'],提取字段改为{"订单编号":"string","下单日期":"string","商品名称":"string","数量":"string","总金额":"string"}。

    -- ===========================
    -- 1. 注册百炼多模态模型
    -- ===========================
    CREATE TEMPORARY MODEL image_model
    INPUT (`input` STRING)
    OUTPUT (`result` VARIANT)
    WITH (
        'provider' = 'openai-compat',
        # 私网访问请替换为自身的endpoint
        'endpoint' = 'https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions',
        'api-key' = '<YOUR_API_KEY>',
        'model' = 'qwen3-vl-flash',
        'content-type' = 'IMAGE_URL'
    );
    
    -- ===========================
    -- 2. 创建MNS源表(消费OSS事件通知)
    -- ===========================
    CREATE TEMPORARY TABLE oss_events (
        eventTime STRING,
        region STRING,
        ossBucketName STRING,
        ossObjectKey STRING,
        ossObjectSize BIGINT
    ) WITH (
        'connector' = 'mns',
        'endpoint' = '<YOUR_MNS_ENDPOINT>',
        'region' = '<YOUR_REGION>',
        'queueName' = '<YOUR_QUEUE_NAME>',
        'accessKeyId' = '${secret_values.mns_ak_id}',
        'accessKeySecret' = '${secret_values.mns_ak_secret}',
        'format' = 'json',
        'messageType' = 'OSS'
    );
    
    -- ===========================
    -- 3. 创建结果表
    -- ===========================
    
    -- 服装结果表
    CREATE TEMPORARY TABLE clothing_results (
        image_path STRING,
        category STRING,
        extracted_contents STRING
    ) WITH (
        'connector' = 'print'
    );
    
    -- 电子产品结果表
    CREATE TEMPORARY TABLE electronics_results (
        image_path STRING,
        category STRING,
        extracted_contents STRING
    ) WITH (
        'connector' = 'print'
    );
    
    -- ===========================
    -- 4. 图片分类与信息提取管道
    -- ===========================
    
    -- 4.1 从OSS下载图片内容并转为Base64编码
    CREATE TEMPORARY VIEW image_in_base64(image_path, image_in_str)
    AS SELECT
        CONCAT('oss://', ossBucketName, '/', ossObjectKey) AS image_path,
        CONCAT(
            'data:', MIME_TYPE(ossObjectKey),
            ';base64,',
            TO_BASE64(FETCH_CONTENT(CONCAT('oss://', ossBucketName, '/', ossObjectKey)))
        ) AS image_in_str
    FROM oss_events
    WHERE MIME_TYPE(ossObjectKey) IN ('image/png', 'image/jpeg', 'image/tiff');
    
    -- 4.2 使用AI_CLASSIFY对图片进行分类
    CREATE TEMPORARY VIEW classified_images(image_path, category, image_in_str)
    AS SELECT image_path, category, image_in_str
    FROM image_in_base64,
        LATERAL TABLE(AI_CLASSIFY(MODEL image_model, image_in_str, ARRAY['服装', '电子产品']));
    
    -- 4.3 按分类结果分流,使用不同的提取schema写入对应结果表
    BEGIN STATEMENT SET;
    
    -- 服装:颜色、款式、材质
    INSERT INTO clothing_results
    SELECT image_path, category, extracted_json
    FROM classified_images,
        LATERAL TABLE(AI_EXTRACT(
            MODEL image_model,
            image_in_str,
            '{"颜色":"string","款式":"string","材质":"string"}'
        ))
    WHERE category = '服装';
    
    -- 电子产品:品牌、型号、颜色
    INSERT INTO electronics_results
    SELECT image_path, category, extracted_json
    FROM classified_images,
        LATERAL TABLE(AI_EXTRACT(
            MODEL image_model,
            image_in_str,
            '{"品牌":"string","型号":"string","颜色":"string"}'
        ))
    WHERE category = '电子产品';
    
    END;

    步骤

    说明

    注册模型

    使用CREATE MODEL注册百炼平台的多模态模型qwen-vl-flash。设置content-typeIMAGE_URL以支持图片输入。详情请参见模型设置

    MNS源表

    设置messageTypeOSS,连接器自动解析OSS事件通知JSON并映射为扁平表结构。字段定义请参见OSS事件通知

    图片下载与编码

    使用FETCH_CONTENT函数下载OSS中的图片,通过TO_BASE64转为Base64编码,拼接Data URI供模型识别。使用MIME_TYPE函数过滤仅处理图片格式文件。

    AI分类

    调用AI_CLASSIFY函数将图片分为"医院发票"或"酒店发票"类别。详情请参见文本分类

    分流提取

    通过WHERE category条件将分类结果分流,对不同类别调用AI_EXTRACT函数提取各自的结构化字段,分别写入对应的结果表。使用BEGIN STATEMENT SET ... END语法在同一个作业中执行多条INSERT语句。详情请参见信息提取

    说明
    • <YOUR_API_KEY>替换为百炼平台的API Key。

    • <YOUR_MNS_ENDPOINT><YOUR_REGION><YOUR_QUEUE_NAME>替换为步骤一中记录的MNS连接信息。

    • MNS源表的accessKeyIdaccessKeySecret建议通过变量管理,避免明文存储。

  4. 将作业部署后,在运维中心 > 作业运维 > 部署详情,单击对应作业,部署详情 > 运行参数配置,单击编辑,在其他配置中添加以下参数,配置OSS Bucket的访问鉴权信息,供FETCH_CONTENT函数下载图片使用

    fs.oss.bucket.<YOUR_BUCKET_NAME>.accessKeyId: <YOUR_OSS_AK_ID>
    fs.oss.bucket.<YOUR_BUCKET_NAME>.accessKeySecret: <YOUR_OSS_AK_SECRET>

    <YOUR_BUCKET_NAME>替换为目标OSS Bucket名称。详情请参见配置Bucket鉴权信息

  5. 开启Checkpoint。MNS连接器依赖Checkpoint实现消息确认和删除,未开启时消息将无限重复消费。

    已经默认开启,系统检查点间隔 默认值为180s。
  6. 保存配置,启动作业。

步骤三:验证结果

  1. 上传测试图片到目标OSS Bucket。分别上传一张服装商品图片和一张电子产品图片,支持PNG、JPEG、TIFF、BMP、WEBP、HEIC格式。

    事件通知规则需设置相同的规则以匹配对应的格式。
  2. 在实时计算控制台,进入目标作业的作业运维页面。

  3. 作业日志页签,选择job Manager > Stdout中,搜索PrintSinkOutputWriter相关日志,查看分类和信息提取结果。

输出示例:

+I[oss://my-bucket/clothing-001.png, 服装, {"颜色":"黑色","款式":"连衣裙","材质":"纯棉"}]
+I[oss://my-bucket/electronics-001.png, 电子产品, {"品牌":"XX品牌","型号":"Pro Max","颜色":"黑色"}]