本文介绍如何通过轻量消息队列MNS连接器订阅OSS事件通知,结合AI Function对新上传的图片进行实时分类和结构化信息提取。
方案
在实时图像处理场景中,用户将图片上传到对象存储OSS后,需要自动完成以下处理:
实时感知新文件上传事件。
下载图片内容,调用大语言模型进行分类。
根据分类结果提取结构化信息。
该场景通过以下组件协同实现:
轻量消息队列MNS:订阅OSS Bucket的对象创建事件,将事件消息投递到MNS队列。
MNS连接器:Flink作业通过MNS连接器实时消费队列中的事件,获取新上传文件的路径。
AI Function:调用阿里云百炼平台的多模态大模型,使用
AI_CLASSIFY对图片分类,使用AI_EXTRACT提取结构化信息。
本文以电商商品图片处理为例,上传一张商品图片到OSS后,Flink作业自动识别商品类别(服装或电子产品),根据分类结果提取不同的结构化字段,并分别写入对应的结果表。
前提条件
已开通实时计算Flink版并创建工作空间,详情请参见开通实时计算Flink版。实时计算引擎版本为VVR 11.6.0及以上。
已开通开通轻量消息队列,且与Flink工作空间处于同一地域。
已创建OSS Bucket,且与MNS队列处于同一地域。
已为Flink工作空间配置网络访问:
使用限制
步骤一:创建MNS队列并配置OSS事件通知
1. 创建MNS队列
在左侧导航栏,选择队列模型 > 队列列表。
在顶部菜单栏选择目标地域。
单击创建队列,配置队列参数后单击确定。
主要配置项说明:
参数 | 说明 | 示例值 |
名称 | 队列名称,例如 | flink-mns-queue |
消息最大长度 | 发送到队列的消息体最大长度。 | 64(默认值) |
消息可见性超时时间 | 建议大于Flink Checkpoint间隔,避免消息重复投递。 | 300(Flink Checkpoint间隔默认值为180s) |
消息保存时长 | 消息在队列中的最长存活时间。 | 4(默认值) |
2. 添加OSS事件通知规则
在MNS控制台左侧导航栏,单击事件通知。
在事件通知页面,单击对象存储OSS页签,然后单击创建规则。
在创建规则-对象存储OSS面板,配置以下参数后单击确定。
参数 | 说明 |
名称 | 规则名称。 |
事件类型 | 选择针对创建文件的操作事件(如 |
匹配规则 | 设置需要监听的OSS资源匹配规则。例如设置后缀为 |
接收终端 | 选择一对一订阅,输入上一步创建的队列名称。 |
详情请参见OSS事件通知。
3. 记录连接信息
在队列列表页面,单击目标队列名称进入详情页,记录以下信息供后续Flink作业使用:
Endpoint:MNS队列的访问地址。与Flink处于同一地域时使用内网地址,格式为
http://<account-id>.mns.<region>-internal.aliyuncs.com。队列名称:创建队列时设置的名称。
步骤二:创建并启动Flink SQL作业
登录实时计算控制台。
创建SQL流作业草稿,详情请参见Flink SQL作业。
在SQL编辑器中输入以下SQL。
本文以商品图片分类为例,实际业务中可根据场景替换分类类别和提取字段。例如:
发票识别:将LABELS改为ARRAY['增值税专用发票', '增值税普通发票'],提取字段改为{"发票号码":"string","开票日期":"string","金额":"string","税额":"string"}。
订单识别:将LABELS改为ARRAY['采购订单', '销售订单'],提取字段改为{"订单编号":"string","下单日期":"string","商品名称":"string","数量":"string","总金额":"string"}。
-- =========================== -- 1. 注册百炼多模态模型 -- =========================== CREATE TEMPORARY MODEL image_model INPUT (`input` STRING) OUTPUT (`result` VARIANT) WITH ( 'provider' = 'openai-compat', # 私网访问请替换为自身的endpoint 'endpoint' = 'https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions', 'api-key' = '<YOUR_API_KEY>', 'model' = 'qwen3-vl-flash', 'content-type' = 'IMAGE_URL' ); -- =========================== -- 2. 创建MNS源表(消费OSS事件通知) -- =========================== CREATE TEMPORARY TABLE oss_events ( eventTime STRING, region STRING, ossBucketName STRING, ossObjectKey STRING, ossObjectSize BIGINT ) WITH ( 'connector' = 'mns', 'endpoint' = '<YOUR_MNS_ENDPOINT>', 'region' = '<YOUR_REGION>', 'queueName' = '<YOUR_QUEUE_NAME>', 'accessKeyId' = '${secret_values.mns_ak_id}', 'accessKeySecret' = '${secret_values.mns_ak_secret}', 'format' = 'json', 'messageType' = 'OSS' ); -- =========================== -- 3. 创建结果表 -- =========================== -- 服装结果表 CREATE TEMPORARY TABLE clothing_results ( image_path STRING, category STRING, extracted_contents STRING ) WITH ( 'connector' = 'print' ); -- 电子产品结果表 CREATE TEMPORARY TABLE electronics_results ( image_path STRING, category STRING, extracted_contents STRING ) WITH ( 'connector' = 'print' ); -- =========================== -- 4. 图片分类与信息提取管道 -- =========================== -- 4.1 从OSS下载图片内容并转为Base64编码 CREATE TEMPORARY VIEW image_in_base64(image_path, image_in_str) AS SELECT CONCAT('oss://', ossBucketName, '/', ossObjectKey) AS image_path, CONCAT( 'data:', MIME_TYPE(ossObjectKey), ';base64,', TO_BASE64(FETCH_CONTENT(CONCAT('oss://', ossBucketName, '/', ossObjectKey))) ) AS image_in_str FROM oss_events WHERE MIME_TYPE(ossObjectKey) IN ('image/png', 'image/jpeg', 'image/tiff'); -- 4.2 使用AI_CLASSIFY对图片进行分类 CREATE TEMPORARY VIEW classified_images(image_path, category, image_in_str) AS SELECT image_path, category, image_in_str FROM image_in_base64, LATERAL TABLE(AI_CLASSIFY(MODEL image_model, image_in_str, ARRAY['服装', '电子产品'])); -- 4.3 按分类结果分流,使用不同的提取schema写入对应结果表 BEGIN STATEMENT SET; -- 服装:颜色、款式、材质 INSERT INTO clothing_results SELECT image_path, category, extracted_json FROM classified_images, LATERAL TABLE(AI_EXTRACT( MODEL image_model, image_in_str, '{"颜色":"string","款式":"string","材质":"string"}' )) WHERE category = '服装'; -- 电子产品:品牌、型号、颜色 INSERT INTO electronics_results SELECT image_path, category, extracted_json FROM classified_images, LATERAL TABLE(AI_EXTRACT( MODEL image_model, image_in_str, '{"品牌":"string","型号":"string","颜色":"string"}' )) WHERE category = '电子产品'; END;步骤
说明
注册模型
使用
CREATE MODEL注册百炼平台的多模态模型qwen-vl-flash。设置content-type为IMAGE_URL以支持图片输入。详情请参见模型设置。MNS源表
设置
messageType为OSS,连接器自动解析OSS事件通知JSON并映射为扁平表结构。字段定义请参见OSS事件通知。图片下载与编码
使用
FETCH_CONTENT函数下载OSS中的图片,通过TO_BASE64转为Base64编码,拼接Data URI供模型识别。使用MIME_TYPE函数过滤仅处理图片格式文件。AI分类
调用
AI_CLASSIFY函数将图片分为"医院发票"或"酒店发票"类别。详情请参见文本分类。分流提取
通过
WHERE category条件将分类结果分流,对不同类别调用AI_EXTRACT函数提取各自的结构化字段,分别写入对应的结果表。使用BEGIN STATEMENT SET ... END语法在同一个作业中执行多条INSERT语句。详情请参见信息提取。说明将
<YOUR_API_KEY>替换为百炼平台的API Key。将
<YOUR_MNS_ENDPOINT>、<YOUR_REGION>、<YOUR_QUEUE_NAME>替换为步骤一中记录的MNS连接信息。MNS源表的
accessKeyId和accessKeySecret建议通过变量管理,避免明文存储。
将作业部署后,在,单击对应作业,,单击编辑,在其他配置中添加以下参数,配置OSS Bucket的访问鉴权信息,供
FETCH_CONTENT函数下载图片使用fs.oss.bucket.<YOUR_BUCKET_NAME>.accessKeyId: <YOUR_OSS_AK_ID> fs.oss.bucket.<YOUR_BUCKET_NAME>.accessKeySecret: <YOUR_OSS_AK_SECRET>将
<YOUR_BUCKET_NAME>替换为目标OSS Bucket名称。详情请参见配置Bucket鉴权信息。开启Checkpoint。MNS连接器依赖Checkpoint实现消息确认和删除,未开启时消息将无限重复消费。
已经默认开启,系统检查点间隔 默认值为180s。
保存配置,启动作业。
步骤三:验证结果
上传测试图片到目标OSS Bucket。分别上传一张服装商品图片和一张电子产品图片,支持PNG、JPEG、TIFF、BMP、WEBP、HEIC格式。
事件通知规则需设置相同的规则以匹配对应的格式。
在实时计算控制台,进入目标作业的作业运维页面。
在作业日志页签,选择中,搜索
PrintSinkOutputWriter相关日志,查看分类和信息提取结果。
输出示例:
+I[oss://my-bucket/clothing-001.png, 服装, {"颜色":"黑色","款式":"连衣裙","材质":"纯棉"}]
+I[oss://my-bucket/electronics-001.png, 电子产品, {"品牌":"XX品牌","型号":"Pro Max","颜色":"黑色"}]