本文为您介绍Flink Python API作业开发的背景信息、使用限制、开发方法、调试方法和连接器使用等。
背景信息
Flink Python作业需要您在本地完成开发工作,Python作业开发完成后,再在Flink开发控制台上部署并启动才能看到业务效果。整体的操作流程详情请参见Flink Python作业快速入门。
开发环境要求
实时计算引擎VVR 8.0.11以下版本预装Python 3.7.9版本,VVR 8.0.11及以上版本预装Python 3.9.21版本。
说明建议本地开发环境的Python版本与目标VVR引擎预装的Python版本保持一致。
已安装PyFlink,且版本与目标VVR引擎使用的Flink版本一致。例如您在部署页面选择的引擎为
vvr-8.0.9-flink-1.17,则需安装apache-flink==1.17.*。pip install apache-flink==1.17.2已安装IDE开发工具,推荐PyCharm或VS Code。
Python作业需要您在线下完成开发,再在实时计算管理控制台上部署并运行。
使用限制
由于Flink受部署环境、网络环境等因素的影响,开发Python作业需要注意以下限制:
仅支持开源Flink V1.13及以上版本。
Flink工作空间已预装了Python环境,且Python环境中已预装了Pandas、NumPy、PyArrow等常用的Python库。详见本文末尾预装软件包列表。
Flink运行环境仅支持JDK 8和JDK 11,如果Python作业中依赖第三方JAR包,请确保JAR包兼容。
VVR 4.x仅支持开源Scala V2.11版本,VVR 6.x 及以上版本仅支持开源Scala V2.12版本。如果Python作业中依赖第三方JAR包,请确保使用Scala版本对应的JAR包依赖。
作业开发
Table API/SQL与DataStream API的选择
PyFlink支持Table API/SQL和DataStream API两种开发方式,推荐优先使用Table API/SQL。原因如下:
性能更优:Table API/SQL的执行计划经过优化后完全在JVM内执行。而DataStream API需要在JVM和Python进程之间逐条数据进行序列化/反序列化,性能开销较大。
功能更完善:Table API/SQL对连接器、数据格式、窗口函数等支持更完整,且与SQL作业共享同一套连接器生态。
社区推荐:Apache Flink社区在PyFlink的发展方向上以Table API/SQL为主。
只有在SQL无法表达的复杂自定义逻辑场景下,才建议使用DataStream API。
开发参考
您可以参见以下文档在本地完成Flink业务代码开发,开发完成后您需要将其上传到Flink开发控制台,并部署上线作业。
Apache Flink V1.20业务代码开发,请参见Flink Python API开发指南。
Apache Flink编码过程中遇到的问题及解决方法,请参见常见问题。
项目结构
推荐的Python作业项目结构如下:
my-flink-python-project/
├── my_job.py # 主作业文件
├── udfs.py # 自定义函数(可选)
├── requirements.txt # 第三方Python依赖(可选)
└── config.properties # 配置文件(可选)依赖管理
Python作业中使用自定义的Python虚拟环境、第三方Python包、JAR包和数据文件的方法,请参见使用Python依赖。
自定义函数(UDF)
以下是一个Python UDSF的开发示例,用于对字符串进行脱敏处理:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())def mask_phone(phone: str):"""手机号脱敏:保留前3位和后4位,中间用****替代"""if phone is None or len(phone) != 11:return phone
return phone[:3] + '****' + phone[7:]在SQL作业中使用该UDF:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;UDF的注册、更新和删除方法,请参见管理自定义函数(UDF)。
连接器使用
Flink所支持的连接器列表,请参见支持的连接器。连接器使用方法如下:
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理。
单击上传资源,选择您要上传的目标连接器的Python包。
您可以上传自己开发的连接器,也可以上传Flink提供的连接器。Flink提供的连接器官方Python包的下载地址,请参见Connector列表。
在页面,单击,附加依赖文件项选择目标连接器的Python包,配置其他参数并部署作业。
单击部署的作业名称,在部署详情页签运行参数配置区域,单击编辑,在其他配置中,添加Python连接器包位置信息。
如果您的作业需要依赖多个连接器Python包,例如依赖的2个包的名字分别为connector-1.jar和connector-2.jar,则配置信息如下。
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'如果您需要使用内置连接器、数据格式和Catalog(仅VVR 11.2及以上版本),可在作业的运行参数配置区域其他配置项中添加配置,例如:
pipeline.used-builtin-connectors: kafka;sls pipeline.used-builtin-formats: avro;parquet
作业调试
您可以在Python自定义函数的代码实现中,通过logging的方式,输出日志信息,方便后期问题定位,示例如下。
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j日志输出后,您可以在TaskManager的日志文件中查看。
本地调试
由于实时计算Flink版默认不具备访问公网的能力,您的代码可能无法在本地直接连接线上数据源进行测试。建议按照以下方式进行本地调试:
单元测试:对自定义函数(UDF)进行独立的单元测试,确保函数逻辑正确。
本地运行:使用本地数据源(如文件或内存数据)模拟输入,在本地运行作业验证处理逻辑。示例如下:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment()# 使用本地数据源进行测试 ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")远程调试:如需连接线上数据源调试,请参见本地运行和调试包含连接器的作业。
作业部署
Python作业开发完成后,需要上传到实时计算控制台进行部署。操作步骤如下:
登录实时计算控制台,进入目标工作空间。
在左侧导航栏单击文件管理,上传Python作业文件(.py或.zip)。如有第三方依赖或配置文件,也需一并上传。
在页面,单击,填写部署信息。
参数
说明
Python文件地址
选择已上传的Python作业文件。
Entry Module
如果作业文件为.py文件,无需填写;如果为.zip文件,需填写入口模块名称,例如
my_job。附加依赖文件
如有连接器JAR包或配置文件,在此选择。
Python Libraries
如有第三方Python包(.whl或.zip),在此选择。
Python Archives
如有自定义Python虚拟环境(.zip),在此选择。
单击部署。
更多部署参数详情请参见部署作业。
完整示例代码
本示例展示一个从Kafka读取数据、进行简单处理后写入MySQL的Python流作业,仅供参考。
示例中未包含Checkpoint、重启策略等运行参数的配置。上述配置可在部署作业完成后,通过部署详情页进行自定义配置。详情请参见配置作业部署信息。
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 创建 Kafka 源表
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# 创建 MySQL 结果表
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# 筛选分数大于60的记录并写入 MySQL
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()预装软件包列表
VVR-11
Flink工作空间已安装下列软件包。
软件包 | 版本 |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
Flink工作空间已安装下列软件包。
软件包 | 版本 |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
Flink工作空间已安装下列软件包。
软件包 | 版本 |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
相关文档
Flink Python作业的完整开发流程示例,请参见Flink Python作业快速入门。
在Flink Python作业中使用自定义的Python虚拟环境、第三方Python包、JAR包和数据文件,详情请参见使用Python依赖。