全部产品
Search
文档中心

实时计算Flink版:Flink Agents开发(公测)

更新时间:Jun 03, 2026

实时计算Flink支持基于开源Apache Flink Agents框架开发事件驱动的流式 AI Agent 作业。本文介绍 Flink Agents 的核心概念、版本要求、快速上手路径、自定义开发流程,以及作业提交与依赖管理。

概述

Apache Flink Agents是 Apache Flink 社区推出的全新子项目,提供了一个用于构建事件驱动型 AI Agent 的开发框架。它基于 Flink 久经考验的流式引擎,将分布式、有状态、容错、流式处理的能力带入 Agentic AI 领域,让智能体真正走进 ToB 生产环境。

Flink Agents内置大模型调用、工具调用、记忆管理、动态编排、可观测性等核心模块,开发者可快速构建大规模、持续运行、端到端可靠的生产级 AI Agent。

Flink Agents具备如下核心优势:

能力

说明

分布式协同

不是把单机 Agent 跑成多副本,而是从框架层提供事件路由分区、状态分布一致、故障断点恢复等能力。

有状态记忆

基于 Flink State 托管 Agent 记忆,随 Checkpoint 自动持久化、恢复与重分布,无需额外存储维护一致性。

流式处理

事件持续到达、Agent 持续响应——Flink 流处理引擎为这种工作模式提供高吞吐、低延迟内核。

可信生产

基于分布式 Checkpoint,节点宕机自愈、状态不丢、事件不丢、数据exactly-once。

Agent 类型

可基于不同场景构建 Workflow 或 ReAct Agent。

类型

适用场景

说明

Workflow Agent

流程明确的场景

通过预定义事件驱动流程编排 Agent 行为。详见Workflow Agent

ReAct Agent

需要灵活决策的场景

结合推理(Reasoning)与行动(Action),由 LLM 自主决定执行步骤。详见ReAct Agent

版本说明

提交 Flink Agents 作业请选择 VVR-11.7 及以上版本,否则作业因缺少运行时依赖而失败。

VVR 引擎版本

Flink 版本

Flink Agents 版本

vvr-11.7.0-flink-1.20

1.20

0.2.1

前提条件

  • 产品与权限

    • 已开通实时计算 Flink 版并创建工作空间,详见开通实时计算Flink版

    • 使用 RAM 用户或 RAM 角色访问时,已具备 Flink 控制台相关权限,详见权限管理

  • LLM 服务

    Flink Agents 作业运行时需要调用 LLM 服务并准备 API Key。可前往阿里云百炼平台创建 API Key。

  • 网络配置

  • 本地开发环境

    • Python:Python 3.10 或 3.11。

    • Java:Java 11+、Maven 3+。

快速入门

本节通过商品评价分析示例演示完整流程:使用 Flink Agents 框架构建包含评价分析 Agent 的示例Flink 作业,作业运行时,评价分析 Agent 调用百炼大模型服务,对评价文本输出满意度评分(1-5 分)和不满意原因。

步骤一:下载示例作业文件

Python 作业

下载 quickstart-python.zip,包含以下文件:

文件

说明

main.py

作业入口。创建执行环境、注册 LLM 连接、构建流处理 pipeline。

review_analysis_agent.py

Agent 实现。定义 Prompt 模板、ChatModel 配置和 Action 处理逻辑。

下载后的 ZIP 包直接用于上传,无需解压。示例使用的 openaidashscope 等 Python 依赖已预装于 VVR 引擎镜像。

Java 作业

下载以下文件:

文件

说明

quickstart-java.jar

测试 fat JAR,可直接上传。

quickstart-java-src.zip

Java 源代码,供参考。

步骤二:上传并部署作业

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列的控制台

  3. 在左侧导航栏单击文件管理,单击上传资源,上传下载的作业文件。

  4. 运维中心 > 作业运维页面,单击部署作业,按作业类型选择 Python 作业JAR 作业,填写部署信息。

Python 作业部署配置

参数

示例

部署模式

流模式

部署名称

flink-agents-quickstart-python

引擎版本

vvr-11.7.0-flink-1.20

Python 文件地址

quickstart-python.zip

Entry Module

quickstart.main

部署目标

default-queue

Java 作业部署配置

参数

示例

部署模式

流模式

部署名称

flink-agents-quickstart-java

引擎版本

vvr-11.7.0-flink-1.20

JAR URI

quickstart-java.jar

Entry Point Class

org.apache.flink.agents.quickstart.Main

部署目标

default-queue

运行参数

部署详情 > 运行参数配置 > 编辑 > 其他配置中添加 Flink 配置项。

Python 作业

python.executable: python3.10
python.client.executable: python3.10
containerized.master.env.FLINK_HOME: /flink
containerized.taskmanager.env.FLINK_HOME: /flink
containerized.master.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.taskmanager.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.master.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
containerized.taskmanager.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'

Java 作业

containerized.master.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.taskmanager.env.OPENAI_API_KEY: <百炼平台API Key>
containerized.master.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
containerized.taskmanager.env.OPENAI_API_BASE_URL: 'https://dashscope.aliyuncs.com/compatible-mode/v1'

配置项说明

配置项

说明

python.executable / python.client.executable

指定 Python 解释器版本(仅 Python 作业)。

containerized.{master,taskmanager}.env.FLINK_HOME

设置 JobManager 与 TaskManager 的 FLINK_HOME 环境变量(仅 Python 作业)。

containerized.{master,taskmanager}.env.OPENAI_API_KEY

百炼平台 API Key。

containerized.{master,taskmanager}.env.OPENAI_API_BASE_URL

百炼平台接口地址。公网为 https://dashscope.aliyuncs.com/compatible-mode/v1;私网地址参见私网访问阿里云百炼模型或应用 API

说明

环境变量需同时配置 containerized.master.env.containerized.taskmanager.env. 两个前缀,确保 JobManager 和 TaskManager 均能读取。

步骤三:启动作业并查看结果

  1. 运维中心 > 作业运维页面,单击目标作业操作列的启动

  2. 作业启动对话框中选择无状态启动,单击启动

  3. 示例使用内存数据源,处理完成后作业自动结束。等待状态变为已完成后,单击作业名称进入详情。

  4. TaskManager 页签查看日志,搜索 Review analysis result: 关键字查看输出结果。

说明

生产环境通常使用 Kafka 等流式数据源,作业持续运行。

开发自定义 Agent

安装 Flink Agents

Python

建议使用虚拟环境:

python3 -m venv flink-agents-env
source flink-agents-env/bin/activate
pip install flink-agents       

详见Flink Agents 安装指南

Java

pom.xml 中添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-agents-api</artifactId>
    <version>${flink-agents.version}</version>
    <scope>provided</scope>
</dependency>
说明

Flink Agents 相关依赖必须声明 <scope>provided</scope>,由 VVR 引擎提供。

需要在本地 IDE 调试时,额外添加:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-agents-ide-support</artifactId>
    <version>${flink-agents.version}</version>
    <scope>provided</scope>
</dependency>

编写 Agent

Flink Agents 作业的核心结构如下:

  1. 注册 LLM 连接:通过 AgentsExecutionEnvironment 注册 ChatModel 连接,连接信息(API Key、端点地址)通过 ResourceDescriptor 配置。详见Chat Models

  2. 定义 Agent:实现自定义 Agent,配置 Prompt 模板,通过 @action(Python)或 @Action(Java)注解定义事件处理逻辑。

  3. 构建 Pipeline:将输入数据流接入 Agent 处理,输出分析结果。

Agent、Prompt、Tool、Memory 等概念详见Flink Agents 开发文档

本地测试

本地测试由 Flink MiniCluster 自动启动,无需部署集群。

Python

python your_agent_job.py        

Java

mvn exec:java -Dexec.mainClass="com.example.YourAgentJob"       

详见Flink Agents 部署文档

依赖管理

镜像预装依赖

VVR 引擎镜像预装 Flink Agents 核心库及部分依赖。

Python 侧

依赖

说明

flink-agents 核心库

Flink Agents Python API

openai

OpenAI 兼容接口(含百炼平台)

dashscope

阿里云百炼平台(通义千问原生接口)

mcp

Model Context Protocol

镜像未预装的依赖需在提交作业时一并上传。详见使用Python依赖

Java 侧

VVR 引擎仅包含 Flink Agents thin JAR(核心代码),所有 LLM integration 第三方依赖需要打入作业 JAR。

Java 作业依赖管理

使用 maven-shade-plugin 打包为 fat JAR,遵循以下规则:

  • Flink Agents 核心依赖(flink-agents-apiflink-agents-runtime 等):<scope>provided</scope>,不打入 fat JAR。

  • Flink 核心依赖(flink-streaming-java 等):<scope>provided</scope>

  • LLM integration 依赖(如 flink-agents-integrations-chat-models-openai):compile scope,打入 fat JAR。

  • 与 Flink 冲突的库(如 Jackson)需在 maven-shade-plugin 中配置 relocation。

完整 pom.xml 配置示例见 quickstart-java-src.zip

提交作业到实时计算 Flink

Python 作业

提交方式与 PyFlink 作业一致,详见Python作业开发。关键参数:

参数

说明

引擎版本

选择支持 Flink Agents 的 VVR 引擎版本,详见版本说明。

Python 文件地址

作业入口 Python 文件或 ZIP 包。

Entry Module

ZIP 包入口模块名。

Python Libraries

额外依赖包。

Java 作业

提交方式与 Flink JAR 作业一致,详见JAR作业开发。关键参数:

参数

说明

引擎版本

选择支持 Flink Agents 的 VVR 引擎版本,详见版本说明。

JAR URI

fat JAR 路径。

Entry Point Class

作业入口类全限定名。

通用运行参数

Python 解释器版本

VVR 引擎镜像默认 Python 版本为 3.9,Flink Agents 需要 3.10 或 3.11。Python 作业必须配置:

python.executable: python3.10
python.client.executable: python3.10
containerized.master.env.FLINK_HOME: /flink
containerized.taskmanager.env.FLINK_HOME: /flink

自定义环境变量

作业代码通过环境变量读取配置(如 LLM API Key、端点地址)时,需同时配置 master 与 taskmanager 两个前缀:

containerized.master.env.<ENV_VAR_NAME>: <value>
containerized.taskmanager.env.<ENV_VAR_NAME>: <value>