This topic describes background information, limits, development methods, debugging techniques, and connector usage for Flink Python API jobs.
Background information
Develop Flink Python jobs locally. After development, deploy and start the job in the Flink development console to see business results. For the complete workflow, see Quick Start for Flink Python jobs.
Development environment requirements
Ververica Runtime (VVR) versions earlier than 8.0.11 include Python 3.7.9. VVR 8.0.11 or later include Python 3.9.21.
NoteUse a local Python version that matches the Python version pre-installed in your target VVR engine.
Install PyFlink with a version that matches the Flink version used by your target VVR engine. For example, if you select
vvr-8.0.9-flink-1.17on the deployment page, installapache-flink==1.17.*.pip install apache-flink==1.17.2Install an IDE. We recommend PyCharm or VS Code.
Complete Python job development offline, then deploy and run the job in the Realtime Compute Management Console.
Limits
Due to deployment and network environment constraints, observe the following limits when developing Python jobs:
Only Apache Flink 1.13 and later are supported.
The Flink workspace includes a pre-installed Python environment with common libraries such as pandas, NumPy, and PyArrow. For details, see the pre-installed software package list at the end of this topic.
The Flink runtime environment supports only JDK 8 and JDK 11. If your Python job depends on third-party JARs, ensure they are compatible.
VVR 4.x supports only Scala 2.11. VVR 6.x and later support only Scala 2.12. If your Python job depends on third-party JARs, ensure the JAR dependencies match the appropriate Scala version.
Job development
Choosing between Table API/SQL and DataStream API
PyFlink supports both Table API/SQL and DataStream API. We recommend using Table API/SQL first for these reasons:
Better performance: Table API/SQL execution plans are optimized and run entirely within the JVM. DataStream API requires serialization and deserialization of each record between the JVM and Python processes, which incurs significant overhead.
More complete features: Table API/SQL offers fuller support for connectors, data formats, and window functions, and shares the same connector ecosystem as SQL jobs.
Community direction: The Apache Flink community focuses PyFlink development primarily on Table API/SQL.
Use DataStream API only for complex custom logic that cannot be expressed in SQL.
Development reference
Use the following documents to develop Flink business code locally. After development, upload your code to the Flink development console and deploy the job.
For Apache Flink 1.20 business code development, see Flink Python API Development Guide.
For common issues and solutions during Apache Flink coding, see FAQ.
Project structure
We recommend the following project structure for Python jobs:
my-flink-python-project/
├── my_job.py # Main job file
├── udfs.py # User-defined functions (optional)
├── requirements.txt # Third-party Python dependencies (optional)
└── config.properties # Configuration file (optional)Dependency management
For instructions on using custom Python virtual environments, third-party Python packages, JARs, and data files in Python jobs, see Use Python dependencies.
User-defined functions (UDFs)
The following example shows how to develop a Python UDSF that desensitizes strings:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""Mask phone numbers: keep the first 3 and last 4 digits, replace the middle with ****"""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]Use this UDF in a SQL job as follows:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;For instructions on registering, updating, and deleting UDFs, see Manage user-defined functions (UDFs).
Use connectors
For a list of supported connectors, see Supported connectors. To use a connector, follow these steps:
Log on to the Realtime Compute console.
Click Console in the Actions column of the target workspace.
In the left navigation pane, click File Management.
Click Upload Artifact and select the Python package for your target connector.
You can upload a custom connector or a built-in Flink connector. For download links to official Flink connector Python packages, see Connector list.
On the page, click . In the Additional Dependencies field, select your connector package, configure other parameters, and deploy the job.
Click the deployed job name. On the Deployment Details tab, in the Runtime Parameters section, click Edit. In the Other Configuration field, add the path to your Python connector package.
If your job depends on multiple connector packages—for example, connector-1.jar and connector-2.jar—configure them as follows:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'To use built-in connectors, data formats, and catalogs (VVR 11.2 or later only), add the following configuration in the Other Configuration field under Runtime Parameters:
pipeline.used-builtin-connectors: kafka;sls pipeline.used-builtin-formats: avro;parquet
Job debugging
In your Python user-defined function implementation, use the logging module to output log information for troubleshooting. For example:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jAfter logging, view the output in the TaskManager log files.
Local debugging
By default, Realtime Compute for Apache Flink cannot access the public network, so your code might not connect directly to online data sources for testing. Use one of these approaches for local debugging:
Unit tests: Test user-defined functions (UDFs) independently to verify logic.
Local execution: Simulate input using local data sources such as files or in-memory data, then run the job locally to validate processing logic. For example:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # Use a local data source for testing ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")Remote debugging: To debug with online data sources, see Run and debug connector-based jobs locally.
Job deployment
After developing a Python job, upload it to the Realtime Compute console for deployment. Follow these steps:
Log on to the Realtime Compute console and go to your target workspace.
In the navigation pane on the left, click File Management. Upload your Python job file (.py or .zip), along with any third-party dependencies or configuration files.
On the page, click and enter deployment information.
Parameter
Description
Python file path
Select the uploaded Python job file.
Entry Module
Leave blank if your job file is a .py file. If it is a .zip file, enter the entry module name, such as
my_job.Additional Dependencies
Select any connector JARs or configuration files here.
Python Libraries
Select any third-party Python packages (.whl or .zip) here.
Python Archives
Select any custom Python virtual environment (.zip) here.
Click Deploy.
For more deployment parameter details, see Deploy a job.
Complete sample code
This example shows a Python streaming job that reads from Kafka, processes data, and writes to MySQL. Use it for reference only.
This example does not include checkpointing or restart policy configurations. You can customize these settings after deployment on the Deployment Details page. For details, see Configure job deployment information.
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Create Kafka source table
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Create MySQL sink table
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# Filter records with score >= 60 and write to MySQL
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()Pre-installed software package list
VVR-11
The following software packages are installed in the Flink workspace.
Software package | Version |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
The following software packages are installed in the Flink workspace.
Software package | Version |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
The following software packages are installed in the Flink workspace.
Software package | Version |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
References
For a complete walkthrough of the Flink Python job development process, see Quick Start for Flink Python jobs.
For details on using custom Python virtual environments, third-party Python packages, JARs, and data files in Flink Python jobs, see Use Python dependencies.
Realtime Compute for Apache Flink also supports SQL and DataStream jobs. For development guides, see Job development map and Develop a JAR job.