实时计算Flink支持基于开源Apache Flink Agents框架开发事件驱动的流式 AI Agent 作业。本文介绍 Flink Agents 的核心概念、版本要求、快速上手路径、自定义开发流程,以及作业提交与依赖管理。
概述
Apache Flink Agents是 Apache Flink 社区推出的全新子项目,提供了一个用于构建事件驱动型 AI Agent 的开发框架。它基于 Flink 久经考验的流式引擎,将分布式、有状态、容错、流式处理的能力带入 Agentic AI 领域,让智能体真正走进 ToB 生产环境。
Flink Agents内置大模型调用、工具调用、记忆管理、动态编排、可观测性等核心模块,开发者可快速构建大规模、持续运行、端到端可靠的生产级 AI Agent。
Flink Agents具备如下核心优势:
能力 | 说明 |
分布式协同 | 不是把单机 Agent 跑成多副本,而是从框架层提供事件路由分区、状态分布一致、故障断点恢复等能力。 |
有状态记忆 | 基于 Flink State 托管 Agent 记忆,随 Checkpoint 自动持久化、恢复与重分布,无需额外存储维护一致性。 |
流式处理 | 事件持续到达、Agent 持续响应——Flink 流处理引擎为这种工作模式提供高吞吐、低延迟内核。 |
可信生产 | 基于分布式 Checkpoint,节点宕机自愈、状态不丢、事件不丢、数据exactly-once。 |
Agent 类型
可基于不同场景构建 Workflow 或 ReAct Agent。
类型 | 适用场景 | 说明 |
Workflow Agent | 流程明确的场景 | 通过预定义事件驱动流程编排 Agent 行为。详见Workflow Agent。 |
ReAct Agent | 需要灵活决策的场景 | 结合推理(Reasoning)与行动(Action),由 LLM 自主决定执行步骤。详见ReAct Agent。 |
版本说明
提交 Flink Agents 作业请选择 VVR-11.7 及以上版本,否则作业因缺少运行时依赖而失败。
VVR 引擎版本 | Flink 版本 | Flink Agents 版本 |
vvr-11.7.0-flink-1.20 | 1.20 | 0.2.1 |
前提条件
产品与权限
已开通实时计算 Flink 版并创建工作空间,详见开通实时计算Flink版。
使用 RAM 用户或 RAM 角色访问时,已具备 Flink 控制台相关权限,详见权限管理。
LLM 服务
Flink Agents 作业运行时需要调用 LLM 服务并准备 API Key。可前往阿里云百炼平台创建 API Key。
网络配置
公网访问:Flink 工作空间需开通公网访问,详见网络连接选型。
私网访问(PrivateLink):通过 VPC 私网连接百炼平台,详见私网访问阿里云百炼模型或应用 API。
本地开发环境
Python:Python 3.10 或 3.11。
Java:Java 11+、Maven 3+。
快速入门
本节通过商品评价分析示例演示完整流程:使用 Flink Agents 框架构建包含评价分析 Agent 的示例Flink 作业,作业运行时,评价分析 Agent 调用百炼大模型服务,对评价文本输出满意度评分(1-5 分)和不满意原因。
步骤一:下载示例作业文件
Python 作业
下载 quickstart-python.zip,包含以下文件:
文件 | 说明 |
main.py | 作业入口。创建执行环境、注册 LLM 连接、构建流处理 pipeline。 |
review_analysis_agent.py | Agent 实现。定义 Prompt 模板、ChatModel 配置和 Action 处理逻辑。 |
下载后的 ZIP 包直接用于上传,无需解压。示例使用的 openai、dashscope 等 Python 依赖已预装于 VVR 引擎镜像。
Java 作业
下载以下文件:
文件 | 说明 |
测试 fat JAR,可直接上传。 | |
Java 源代码,供参考。 |
步骤二:上传并部署作业
登录实时计算控制台。
单击目标工作空间操作列的控制台。
在左侧导航栏单击文件管理,单击上传资源,上传下载的作业文件。
在运维中心 > 作业运维页面,单击部署作业,按作业类型选择 Python 作业或 JAR 作业,填写部署信息。
Python 作业部署配置
参数 | 示例 |
部署模式 | 流模式 |
部署名称 | flink-agents-quickstart-python |
引擎版本 | vvr-11.7.0-flink-1.20 |
Python 文件地址 | quickstart-python.zip |
Entry Module | quickstart.main |
部署目标 | default-queue |
Java 作业部署配置
参数 | 示例 |
部署模式 | 流模式 |
部署名称 | flink-agents-quickstart-java |
引擎版本 | vvr-11.7.0-flink-1.20 |
JAR URI | quickstart-java.jar |
Entry Point Class | org.apache.flink.agents.quickstart.Main |
部署目标 | default-queue |
运行参数
在部署详情 > 运行参数配置 > 编辑 > 其他配置中添加 Flink 配置项。
Python 作业
python.executable: python3.10
python.client.executable: python3.10
containerized.master.env.FLINK_HOME: /flink
containerized.taskmanager.env.FLINK_HOME: /flink
containerized.master.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.taskmanager.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.master.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
containerized.taskmanager.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'Java 作业
containerized.master.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.taskmanager.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.master.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
containerized.taskmanager.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'配置项说明
配置项 | 说明 |
| 指定 Python 解释器版本(仅 Python 作业)。 |
| 设置 JobManager 与 TaskManager 的 |
| 百炼平台 API Key。 |
| 百炼平台接口地址。公网为 |
环境变量需同时配置 containerized.master.env. 与 containerized.taskmanager.env. 两个前缀,确保 JobManager 和 TaskManager 均能读取。
步骤三:启动作业并查看结果
在运维中心 > 作业运维页面,单击目标作业操作列的启动。
在作业启动对话框中选择无状态启动,单击启动。
示例使用内存数据源,处理完成后作业自动结束。等待状态变为已完成后,单击作业名称进入详情。
在 TaskManager 页签查看日志,搜索
Review analysis result:关键字查看输出结果。
生产环境通常使用 Kafka 等流式数据源,作业持续运行。
开发自定义 Agent
安装 Flink Agents
Python
建议使用虚拟环境:
python3 -m venv flink-agents-env
source flink-agents-env/bin/activate
pip install flink-agents Java
在 pom.xml 中添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-api</artifactId>
<version>${flink-agents.version}</version>
<scope>provided</scope>
</dependency>Flink Agents 相关依赖必须声明 <scope>provided</scope>,由 VVR 引擎提供。
需要在本地 IDE 调试时,额外添加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-ide-support</artifactId>
<version>${flink-agents.version}</version>
<scope>provided</scope>
</dependency>编写 Agent
Flink Agents 作业的核心结构如下:
注册 LLM 连接:通过
AgentsExecutionEnvironment注册 ChatModel 连接,连接信息(API Key、端点地址)通过ResourceDescriptor配置。详见Chat Models。定义 Agent:实现自定义 Agent,配置 Prompt 模板,通过
@action(Python)或@Action(Java)注解定义事件处理逻辑。构建 Pipeline:将输入数据流接入 Agent 处理,输出分析结果。
Agent、Prompt、Tool、Memory 等概念详见Flink Agents 开发文档。
本地测试
本地测试由 Flink MiniCluster 自动启动,无需部署集群。
Python
python your_agent_job.py Java
mvn exec:java -Dexec.mainClass="com.example.YourAgentJob" 依赖管理
镜像预装依赖
VVR 引擎镜像预装 Flink Agents 核心库及部分依赖。
Python 侧
依赖 | 说明 |
flink-agents 核心库 | Flink Agents Python API |
openai | OpenAI 兼容接口(含百炼平台) |
dashscope | 阿里云百炼平台(通义千问原生接口) |
mcp | Model Context Protocol |
镜像未预装的依赖需在提交作业时一并上传。详见使用Python依赖。
Java 侧
VVR 引擎仅包含 Flink Agents thin JAR(核心代码),所有 LLM integration 第三方依赖需要打入作业 JAR。
Java 作业依赖管理
使用 maven-shade-plugin 打包为 fat JAR,遵循以下规则:
Flink Agents 核心依赖(
flink-agents-api、flink-agents-runtime等):<scope>provided</scope>,不打入 fat JAR。Flink 核心依赖(
flink-streaming-java等):<scope>provided</scope>。LLM integration 依赖(如
flink-agents-integrations-chat-models-openai):compile scope,打入 fat JAR。与 Flink 冲突的库(如 Jackson)需在
maven-shade-plugin中配置 relocation。
完整 pom.xml 配置示例见 quickstart-java-src.zip。
提交作业到实时计算 Flink
Python 作业
提交方式与 PyFlink 作业一致,详见Python作业开发。关键参数:
参数 | 说明 |
引擎版本 | 选择支持 Flink Agents 的 VVR 引擎版本,详见版本说明。 |
Python 文件地址 | 作业入口 Python 文件或 ZIP 包。 |
Entry Module | ZIP 包入口模块名。 |
Python Libraries | 额外依赖包。 |
Java 作业
提交方式与 Flink JAR 作业一致,详见JAR作业开发。关键参数:
参数 | 说明 |
引擎版本 | 选择支持 Flink Agents 的 VVR 引擎版本,详见版本说明。 |
JAR URI | fat JAR 路径。 |
Entry Point Class | 作业入口类全限定名。 |
通用运行参数
Python 解释器版本
VVR 引擎镜像默认 Python 版本为 3.9,Flink Agents 需要 3.10 或 3.11。Python 作业必须配置:
python.executable: python3.10
python.client.executable: python3.10
containerized.master.env.FLINK_HOME: /flink
containerized.taskmanager.env.FLINK_HOME: /flink自定义环境变量
作业代码通过环境变量读取配置(如 LLM API Key、端点地址)时,需同时配置 master 与 taskmanager 两个前缀:
containerized.master.env.<ENV_VAR_NAME>: <value>
containerized.taskmanager.env.<ENV_VAR_NAME>: <value>