全部产品
Search
文档中心

实时计算Flink版:Python作业开发

更新时间:Mar 04, 2026

本文为您介绍Flink Python API作业开发的背景信息、使用限制、开发方法、调试方法和连接器使用等。

背景信息

Flink Python作业需要您在本地完成开发工作,Python作业开发完成后,再在Flink开发控制台上部署并启动才能看到业务效果。整体的操作流程详情请参见Flink Python作业快速入门

开发环境要求

  • 实时计算引擎VVR 8.0.11以下版本预装Python 3.7.9版本,VVR 8.0.11及以上版本预装Python 3.9.21版本。

    说明

    建议本地开发环境的Python版本与目标VVR引擎预装的Python版本保持一致。

  • 已安装PyFlink,且版本与目标VVR引擎使用的Flink版本一致。例如您在部署页面选择的引擎为vvr-8.0.9-flink-1.17,则需安装apache-flink==1.17.*

    pip install apache-flink==1.17.2
  • 已安装IDE开发工具,推荐PyCharm或VS Code。

  • Python作业需要您在线下完成开发,再在实时计算管理控制台上部署并运行。

使用限制

由于Flink受部署环境、网络环境等因素的影响,开发Python作业需要注意以下限制:

  • 仅支持开源Flink V1.13及以上版本。

  • Flink工作空间已预装了Python环境,且Python环境中已预装了Pandas、NumPy、PyArrow等常用的Python库。详见本文末尾预装软件包列表

  • Flink运行环境仅支持JDK 8和JDK 11,如果Python作业中依赖第三方JAR包,请确保JAR包兼容。

  • VVR 4.x仅支持开源Scala V2.11版本,VVR 6.x 及以上版本仅支持开源Scala V2.12版本。如果Python作业中依赖第三方JAR包,请确保使用Scala版本对应的JAR包依赖。

作业开发

Table API/SQL与DataStream API的选择

PyFlink支持Table API/SQL和DataStream API两种开发方式,推荐优先使用Table API/SQL。原因如下:

  • 性能更优:Table API/SQL的执行计划经过优化后完全在JVM内执行。而DataStream API需要在JVM和Python进程之间逐条数据进行序列化/反序列化,性能开销较大。

  • 功能更完善:Table API/SQL对连接器、数据格式、窗口函数等支持更完整,且与SQL作业共享同一套连接器生态。

  • 社区推荐:Apache Flink社区在PyFlink的发展方向上以Table API/SQL为主。

只有在SQL无法表达的复杂自定义逻辑场景下,才建议使用DataStream API。

开发参考

您可以参见以下文档在本地完成Flink业务代码开发,开发完成后您需要将其上传到Flink开发控制台,并部署上线作业。

项目结构

推荐的Python作业项目结构如下:

my-flink-python-project/
├── my_job.py                # 主作业文件
├── udfs.py                  # 自定义函数(可选)
├── requirements.txt         # 第三方Python依赖(可选)
└── config.properties        # 配置文件(可选)

依赖管理

Python作业中使用自定义的Python虚拟环境、第三方Python包、JAR包和数据文件的方法,请参见使用Python依赖

自定义函数(UDF)

以下是一个Python UDSF的开发示例,用于对字符串进行脱敏处理:

from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.STRING())def mask_phone(phone: str):"""手机号脱敏:保留前3位和后4位,中间用****替代"""if phone is None or len(phone) != 11:return phone
    return phone[:3] + '****' + phone[7:]

在SQL作业中使用该UDF:

CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;

UDF的注册、更新和删除方法,请参见管理自定义函数(UDF)

连接器使用

Flink所支持的连接器列表,请参见支持的连接器。连接器使用方法如下:

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击文件管理

  4. 单击上传资源,选择您要上传的目标连接器的Python包。

    您可以上传自己开发的连接器,也可以上传Flink提供的连接器。Flink提供的连接器官方Python包的下载地址,请参见Connector列表

  5. 运维中心 > 作业运维页面,单击部署作业 > Python 作业附加依赖文件项选择目标连接器的Python包,配置其他参数并部署作业。

  6. 单击部署的作业名称,在部署详情页签运行参数配置区域,单击编辑,在其他配置中,添加Python连接器包位置信息。

    如果您的作业需要依赖多个连接器Python包,例如依赖的2个包的名字分别为connector-1.jar和connector-2.jar,则配置信息如下。

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'
  7. 如果您需要使用内置连接器、数据格式和Catalog(仅VVR 11.2及以上版本),可在作业的运行参数配置区域其他配置项中添加配置,例如:

    pipeline.used-builtin-connectors: kafka;sls
    pipeline.used-builtin-formats: avro;parquet

作业调试

您可以在Python自定义函数的代码实现中,通过logging的方式,输出日志信息,方便后期问题定位,示例如下。

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
  logging.info("hello world")
  return i + j

日志输出后,您可以在TaskManager的日志文件中查看。

本地调试

由于实时计算Flink版默认不具备访问公网的能力,您的代码可能无法在本地直接连接线上数据源进行测试。建议按照以下方式进行本地调试:

  • 单元测试:对自定义函数(UDF)进行独立的单元测试,确保函数逻辑正确。

  • 本地运行:使用本地数据源(如文件或内存数据)模拟输入,在本地运行作业验证处理逻辑。示例如下:

    from pyflink.datastream import StreamExecutionEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()# 使用本地数据源进行测试
    ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)])
    ds.key_by(lambda x: x[0]).sum(1).print()
    env.execute("local_test")
  • 远程调试:如需连接线上数据源调试,请参见本地运行和调试包含连接器的作业

作业部署

Python作业开发完成后,需要上传到实时计算控制台进行部署。操作步骤如下:

  1. 登录实时计算控制台,进入目标工作空间。

  2. 在左侧导航栏单击文件管理,上传Python作业文件(.py或.zip)。如有第三方依赖或配置文件,也需一并上传。

  3. 运维中心 > 作业运维页面,单击部署作业 > Python作业,填写部署信息。

    参数

    说明

    Python文件地址

    选择已上传的Python作业文件。

    Entry Module

    如果作业文件为.py文件,无需填写;如果为.zip文件,需填写入口模块名称,例如my_job

    附加依赖文件

    如有连接器JAR包或配置文件,在此选择。

    Python Libraries

    如有第三方Python包(.whl或.zip),在此选择。

    Python Archives

    如有自定义Python虚拟环境(.zip),在此选择。

  4. 单击部署

更多部署参数详情请参见部署作业

完整示例代码

本示例展示一个从Kafka读取数据、进行简单处理后写入MySQL的Python流作业,仅供参考。

说明

示例中未包含Checkpoint、重启策略等运行参数的配置。上述配置可在部署作业完成后,通过部署详情页进行自定义配置。详情请参见配置作业部署信息

import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

logging.basicConfig(stream=sys.stdout, level=logging.INFO)


def kafka_to_mysql():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)

    # 创建 Kafka 源表
    t_env.execute_sql("""
        CREATE TABLE kafka_source (
            `id` INT,
            `name` STRING,
            `score` INT,
            `event_time` TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'student_topic',
            'properties.bootstrap.servers' = 'your-kafka-broker:9092',
            'properties.group.id' = 'my-group',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)

    # 创建 MySQL 结果表
    t_env.execute_sql("""
        CREATE TABLE mysql_sink (
            `id` INT,
            `name` STRING,
            `score` INT,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
            'table-name' = 'student',
            'username' = 'your_username',
            'password' = 'your_password'
        )
    """)

    # 筛选分数大于60的记录并写入 MySQL
    t_env.execute_sql("""
        INSERT INTO mysql_sink
        SELECT id, name, score
        FROM kafka_source
        WHERE score >= 60
    """)


if __name__ == '__main__':
    kafka_to_mysql()

预装软件包列表

VVR-11

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.48.0

avro-python3

1.10.2

brotlipy

0.7.0

certifi

2022.12.7

cffi

1.15.1

charset-normalizer

2.0.4

cloudpickle

2.2.1

conda

22.11.1

conda-content-trust

0.1.3

conda-package-handling

1.9.0

crcmod

1.7

cryptography

38.0.1

Cython

3.0.12

dill

0.3.1.1

dnspython

2.7.0

docopt

0.6.2

exceptiongroup

1.3.0

fastavro

1.12.1

fasteners

0.20

find_libpython

0.5.0

grpcio

1.56.2

grpcio-tools

1.56.2

hdfs

2.7.3

httplib2

0.22.0

idna

3.4

importlib_metadata

8.7.0

iniconfig

2.1.0

isort

6.1.0

numpy

1.24.4

objsize

0.6.1

orjson

3.9.15

packaging

25.0

pandas

2.3.3

pemja

0.5.5

pip

22.3.1

pluggy

1.0.0

proto-plus

1.26.1

protobuf

4.25.8

py-spy

0.4.0

py4j

0.10.9.7

pyarrow

11.0.0

pyarrow-hotfix

0.6

pycodestyle

2.14.0

pycosat

0.6.4

pycparser

2.21

pydot

1.4.2

pymongo

4.15.4

pyOpenSSL

22.0.0

pyparsing

3.2.5

PySocks

1.7.1

pytest

7.4.4

python-dateutil

2.9.0

pytz

2025.2

regex

2025.11.3

requests

2.32.5

ruamel.yaml

0.18.16

ruamel.yaml.clib

0.2.14

setuptools

70.0.0

six

1.16.0

tomli

2.3.0

toolz

0.12.0

tqdm

4.64.1

typing_extensions

4.15.0

tzdata

2025.2

urllib3

1.26.13

wheel

0.38.4

zipp

3.23.0

zstandard

0.25.0

VVR-8

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.43.0

avro-python3

1.9.2.1

certifi

2025.7.9

charset-normalizer

3.4.2

cloudpickle

2.2.0

crcmod

1.7

Cython

0.29.24

dill

0.3.1.1

docopt

0.6.2

fastavro

1.4.7

fasteners

0.19

find_libpython

0.4.1

grpcio

1.46.3

grpcio-tools

1.46.3

hdfs

2.7.3

httplib2

0.20.4

idna

3.10

isort

6.0.1

numpy

1.21.6

objsize

0.5.2

orjson

3.10.18

pandas

1.3.5

pemja

0.3.2

pip

22.3.1

proto-plus

1.26.1

protobuf

3.20.3

py4j

0.10.9.7

pyarrow

8.0.0

pycodestyle

2.14.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.2.3

python-dateutil

2.9.0

pytz

2025.2

regex

2024.11.6

requests

2.32.4

setuptools

58.1.0

six

1.17.0

typing_extensions

4.14.1

urllib3

2.5.0

wheel

0.33.4

zstandard

0.23.0

VVR-6

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.27.0

avro-python3

1.9.2.1

certifi

2024.8.30

charset-normalizer

3.3.2

cloudpickle

1.2.2

crcmod

1.7

Cython

0.29.16

dill

0.3.1.1

docopt

0.6.2

fastavro

0.23.6

future

0.18.3

grpcio

1.29.0

hdfs

2.7.3

httplib2

0.17.4

idna

3.8

importlib-metadata

6.7.0

isort

5.11.5

jsonpickle

2.0.0

mock

2.0.0

numpy

1.19.5

oauth2client

4.1.3

pandas

1.1.5

pbr

6.1.0

pemja

0.1.4

pip

20.1.1

protobuf

3.17.3

py4j

0.10.9.3

pyarrow

2.0.0

pyasn1

0.5.1

pyasn1-modules

0.3.0

pycodestyle

2.10.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.1.4

python-dateutil

2.8.0

pytz

2024.1

requests

2.31.0

rsa

4.9

setuptools

47.1.0

six

1.16.0

typing-extensions

3.7.4.3

urllib3

2.0.7

wheel

0.42.0

zipp

3.15.0

相关文档