Develop, debug, and deploy PyFlink jobs on Realtime Compute for Apache Flink. This guide covers environment setup, API selection, user-defined functions (UDFs), connector configuration, debugging, and deployment.
For a complete walkthrough, see Quick start with PyFlink jobs.
Prerequisites
Before you begin, make sure you have:
-
A workspace in Realtime Compute for Apache Flink
-
An IDE such as PyCharm or VS Code
Constraints and limits
| Constraint | Details |
|---|---|
| Flink version | Apache Flink 1.13 and later |
| JDK compatibility | JDK 8 and JDK 11 only; make sure third-party JARs are compatible |
| Scala compatibility | VVR 4.x: Scala 2.11 only; VVR 6.x and later: Scala 2.12 only |
| Pre-installed Python libraries | Includes pandas, NumPy, and PyArrow. For the full list, see Pre-installed software packages |
| Built-in connectors | Available in VVR 11.2 and later |
Set up your local environment
Match the Python version
The Python version in your local environment must match the one pre-installed in your target Ververica Runtime (VVR) engine:
-
VVR earlier than 8.0.11: Python 3.7.9
-
VVR 8.0.11 and later: Python 3.9.21
Check your current Python version:
python --version
If the version does not match, install the required version or use a tool such as pyenv to manage multiple Python versions.
Install PyFlink
Install PyFlink with a version that matches the Apache Flink version in your target VVR engine. For example, if your deployment uses vvr-8.0.9-flink-1.17, install:
pip install apache-flink==1.17.2
The version pattern is: vvr-X.X.X-flink-Y.YY maps to apache-flink==Y.YY.*.
Choose an API
PyFlink supports Table API/SQL and DataStream API. Table API/SQL is strongly recommended for most use cases:
| Criterion | Table API/SQL | DataStream API |
|---|---|---|
| Performance | Execution plans run entirely in the JVM after optimization | Per-record serialization and deserialization between JVM and Python adds significant overhead |
| Connector support | Full support for connectors, data formats, and window functions | Limited |
| Community priority | Prioritized by the Apache Flink community | Lower priority |
Use DataStream API only when SQL cannot express your complex custom logic.
Set up your project
The recommended project structure:
my-flink-python-project/
├── my_job.py # Main job file
├── udfs.py # User-defined functions (optional)
├── requirements.txt # Third-party Python dependencies (optional)
└── config.properties # Configuration file (optional)
Manage dependencies
To use custom Python virtual environments, third-party Python packages, JARs, or data files in PyFlink jobs, see Use Python dependencies.
Write user-defined functions
The following example shows a user-defined scalar function (UDSF) that masks sensitive data in phone numbers:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""Mask phone number: keep the first three and last four digits; replace middle digits with ****."""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]
Register the UDF in a SQL job:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;
To register, update, or delete UDFs, see Manage user-defined functions (UDFs).
Configure connectors
Connectors let your PyFlink job read from and write to external systems. For supported connectors, see Supported connectors.
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left navigation pane, click Artifacts.
-
Click Upload Artifact, and select the Python package for the target connector. Download official Python packages for Flink connectors from the connector list.
-
On the O\&M \> Deployments page, click Create Deployment \> Python Job, select the connector package in Additional Dependencies, configure the other parameters, and deploy the job.
-
Click the job name. On the Configuration tab, go to Parameters, click Edit, and add the connector package path in Other Configuration. For multiple connector JARs, separate them with semicolons:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar' -
Optional: To use built-in connectors, formats, or catalogs (available in VVR 11.2 and later), add the following configuration in Other Configuration under Parameters:
## Use multiple built-in connectors pipeline.used-builtin-connectors: kafka;sls ## Use multiple data formats pipeline.used-builtin-formats: avro;parquet ## Use multiple existing catalogs pipeline.used-builtin-catalogs: catalogname1;catalogname2
For complete connector usage examples, see Complete sample code.
Debug a job
When you implement a Python UDF, use the logging module to output log information for troubleshooting:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("Computing add: %s + %s", i, j)
return i + j
View the output in the TaskManager log files.
Debug locally
By default, Realtime Compute for Apache Flink cannot access the internet, so your code may not connect directly to online data sources for testing. Use one of the following approaches:
-
Unit tests: Run independent unit tests on your UDFs to verify logic correctness.
-
Local execution: Simulate input with in-memory data and run the job locally to validate processing logic:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # Test using in-memory data ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test") -
Remote debugging: To debug with online data sources, see Run and debug jobs with connectors locally.
Deploy a job
After you finish developing your PyFlink job, deploy it to Realtime Compute for Apache Flink:
-
Log on to the Realtime Compute for Apache Flink console and go to the target workspace.
-
In the left navigation pane, click Artifacts. Upload your Python job files (
.pyor.zip), third-party dependencies, and configuration files. -
On the O\&M \> Deployments page, click Create Deployment \> Python Job, and fill in the deployment parameters:
Parameter Description Python URI Select the uploaded Python job file. Entry Module Leave blank for .pyfiles. For.zipfiles, enter the entry module name — for example,my_job.Additional Dependencies Select connector JARs or configuration files, if any. Python Libraries Select third-party Python packages ( .whlor.zip), if any.Python Archives Select custom Python virtual environments ( .zip), if any. -
Click Deploy.
For details about deployment parameters, see Deploy a job.
Complete sample code
The following example reads data from Kafka, filters records, and writes results to MySQL. All table definitions use the Table API and SQL.
This example does not include configurations for checkpoints or restart strategies. Add these after deployment on the Configuration tab. For details, see Configure job deployment settings.
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# Create execution environments
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Create Kafka source table
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Create MySQL sink table
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# Filter records with score >= 60 and write to MySQL
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()
Replace the following placeholders:
| Placeholder | Description | Example |
|---|---|---|
your-kafka-broker:9092 |
Kafka bootstrap server address | broker.example.com:9092 |
your-mysql-host:3306/my_database |
MySQL host and database | db.example.com:3306/students |
your_username |
MySQL username | admin |
your_password |
MySQL password | — |
Pre-installed software packages
VVR-11
The following software packages are installed in the Flink workspace.
| Software package | Version |
|---|---|
| apache-beam | 2.48.0 |
| avro-python3 | 1.10.2 |
| brotlipy | 0.7.0 |
| certifi | 2022.12.7 |
| cffi | 1.15.1 |
| charset-normalizer | 2.0.4 |
| cloudpickle | 2.2.1 |
| conda | 22.11.1 |
| conda-content-trust | 0.1.3 |
| conda-package-handling | 1.9.0 |
| crcmod | 1.7 |
| cryptography | 38.0.1 |
| Cython | 3.0.12 |
| dill | 0.3.1.1 |
| dnspython | 2.7.0 |
| docopt | 0.6.2 |
| exceptiongroup | 1.3.0 |
| fastavro | 1.12.1 |
| fasteners | 0.20 |
| find_libpython | 0.5.0 |
| grpcio | 1.56.2 |
| grpcio-tools | 1.56.2 |
| hdfs | 2.7.3 |
| httplib2 | 0.22.0 |
| idna | 3.4 |
| importlib_metadata | 8.7.0 |
| iniconfig | 2.1.0 |
| isort | 6.1.0 |
| numpy | 1.24.4 |
| objsize | 0.6.1 |
| orjson | 3.9.15 |
| packaging | 25.0 |
| pandas | 2.3.3 |
| pemja | 0.5.5 |
| pip | 22.3.1 |
| pluggy | 1.0.0 |
| proto-plus | 1.26.1 |
| protobuf | 4.25.8 |
| py-spy | 0.4.0 |
| py4j | 0.10.9.7 |
| pyarrow | 11.0.0 |
| pyarrow-hotfix | 0.6 |
| pycodestyle | 2.14.0 |
| pycosat | 0.6.4 |
| pycparser | 2.21 |
| pydot | 1.4.2 |
| pymongo | 4.15.4 |
| pyOpenSSL | 22.0.0 |
| pyparsing | 3.2.5 |
| PySocks | 1.7.1 |
| pytest | 7.4.4 |
| python-dateutil | 2.9.0 |
| pytz | 2025.2 |
| regex | 2025.11.3 |
| requests | 2.32.5 |
| ruamel.yaml | 0.18.16 |
| ruamel.yaml.clib | 0.2.14 |
| setuptools | 70.0.0 |
| six | 1.16.0 |
| tomli | 2.3.0 |
| toolz | 0.12.0 |
| tqdm | 4.64.1 |
| typing_extensions | 4.15.0 |
| tzdata | 2025.2 |
| urllib3 | 1.26.13 |
| wheel | 0.38.4 |
| zipp | 3.23.0 |
| zstandard | 0.25.0 |
VVR-8
The following software packages are pre-installed in the Flink workspace.
| Software package | Version |
|---|---|
| apache-beam | 2.43.0 |
| avro-python3 | 1.9.2.1 |
| certifi | 2025.7.9 |
| charset-normalizer | 3.4.2 |
| cloudpickle | 2.2.0 |
| crcmod | 1.7 |
| Cython | 0.29.24 |
| dill | 0.3.1.1 |
| docopt | 0.6.2 |
| fastavro | 1.4.7 |
| fasteners | 0.19 |
| find_libpython | 0.4.1 |
| grpcio | 1.46.3 |
| grpcio-tools | 1.46.3 |
| hdfs | 2.7.3 |
| httplib2 | 0.20.4 |
| idna | 3.10 |
| isort | 6.0.1 |
| numpy | 1.21.6 |
| objsize | 0.5.2 |
| orjson | 3.10.18 |
| pandas | 1.3.5 |
| pemja | 0.3.2 |
| pip | 22.3.1 |
| proto-plus | 1.26.1 |
| protobuf | 3.20.3 |
| py4j | 0.10.9.7 |
| pyarrow | 8.0.0 |
| pycodestyle | 2.14.0 |
| pydot | 1.4.2 |
| pymongo | 3.13.0 |
| pyparsing | 3.2.3 |
| python-dateutil | 2.9.0 |
| pytz | 2025.2 |
| regex | 2024.11.6 |
| requests | 2.32.4 |
| setuptools | 58.1.0 |
| six | 1.17.0 |
| typing_extensions | 4.14.1 |
| urllib3 | 2.5.0 |
| wheel | 0.33.4 |
| zstandard | 0.23.0 |
VVR-6
The following software packages are pre-installed in the Flink workspace.
| Software package | Version |
|---|---|
| apache-beam | 2.27.0 |
| avro-python3 | 1.9.2.1 |
| certifi | 2024.8.30 |
| charset-normalizer | 3.3.2 |
| cloudpickle | 1.2.2 |
| crcmod | 1.7 |
| Cython | 0.29.16 |
| dill | 0.3.1.1 |
| docopt | 0.6.2 |
| fastavro | 0.23.6 |
| future | 0.18.3 |
| grpcio | 1.29.0 |
| hdfs | 2.7.3 |
| httplib2 | 0.17.4 |
| idna | 3.8 |
| importlib-metadata | 6.7.0 |
| isort | 5.11.5 |
| jsonpickle | 2.0.0 |
| mock | 2.0.0 |
| numpy | 1.19.5 |
| oauth2client | 4.1.3 |
| pandas | 1.1.5 |
| pbr | 6.1.0 |
| pemja | 0.1.4 |
| pip | 20.1.1 |
| protobuf | 3.17.3 |
| py4j | 0.10.9.3 |
| pyarrow | 2.0.0 |
| pyasn1 | 0.5.1 |
| pyasn1-modules | 0.3.0 |
| pycodestyle | 2.10.0 |
| pydot | 1.4.2 |
| pymongo | 3.13.0 |
| pyparsing | 3.1.4 |
| python-dateutil | 2.8.0 |
| pytz | 2024.1 |
| requests | 2.31.0 |
| rsa | 4.9 |
| setuptools | 47.1.0 |
| six | 1.16.0 |
| typing-extensions | 3.7.4.3 |
| urllib3 | 2.0.7 |
| wheel | 0.42.0 |
| zipp | 3.15.0 |
What's next
-
Quick start with PyFlink jobs — end-to-end walkthrough of the PyFlink job development process
-
Use Python dependencies — custom Python virtual environments, third-party libraries, JARs, and data files
-
Job development overview and Develop JAR jobs — SQL and DataStream job development guides