All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop PyFlink jobs

Last Updated:Mar 26, 2026

Develop, debug, and deploy PyFlink jobs on Realtime Compute for Apache Flink. This guide covers environment setup, API selection, user-defined functions (UDFs), connector configuration, debugging, and deployment.

For a complete walkthrough, see Quick start with PyFlink jobs.

Prerequisites

Before you begin, make sure you have:

Constraints and limits

Constraint Details
Flink version Apache Flink 1.13 and later
JDK compatibility JDK 8 and JDK 11 only; make sure third-party JARs are compatible
Scala compatibility VVR 4.x: Scala 2.11 only; VVR 6.x and later: Scala 2.12 only
Pre-installed Python libraries Includes pandas, NumPy, and PyArrow. For the full list, see Pre-installed software packages
Built-in connectors Available in VVR 11.2 and later

Set up your local environment

Match the Python version

The Python version in your local environment must match the one pre-installed in your target Ververica Runtime (VVR) engine:

  • VVR earlier than 8.0.11: Python 3.7.9

  • VVR 8.0.11 and later: Python 3.9.21

Check your current Python version:

python --version

If the version does not match, install the required version or use a tool such as pyenv to manage multiple Python versions.

Install PyFlink

Install PyFlink with a version that matches the Apache Flink version in your target VVR engine. For example, if your deployment uses vvr-8.0.9-flink-1.17, install:

pip install apache-flink==1.17.2

The version pattern is: vvr-X.X.X-flink-Y.YY maps to apache-flink==Y.YY.*.

Choose an API

PyFlink supports Table API/SQL and DataStream API. Table API/SQL is strongly recommended for most use cases:

Criterion Table API/SQL DataStream API
Performance Execution plans run entirely in the JVM after optimization Per-record serialization and deserialization between JVM and Python adds significant overhead
Connector support Full support for connectors, data formats, and window functions Limited
Community priority Prioritized by the Apache Flink community Lower priority

Use DataStream API only when SQL cannot express your complex custom logic.

Set up your project

The recommended project structure:

my-flink-python-project/
├── my_job.py                # Main job file
├── udfs.py                  # User-defined functions (optional)
├── requirements.txt         # Third-party Python dependencies (optional)
└── config.properties        # Configuration file (optional)

Manage dependencies

To use custom Python virtual environments, third-party Python packages, JARs, or data files in PyFlink jobs, see Use Python dependencies.

Write user-defined functions

The following example shows a user-defined scalar function (UDSF) that masks sensitive data in phone numbers:

from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
    """Mask phone number: keep the first three and last four digits; replace middle digits with ****."""
    if phone is None or len(phone) != 11:
        return phone
    return phone[:3] + '****' + phone[7:]

Register the UDF in a SQL job:

CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;

To register, update, or delete UDFs, see Manage user-defined functions (UDFs).

Configure connectors

Connectors let your PyFlink job read from and write to external systems. For supported connectors, see Supported connectors.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, click Artifacts.

  4. Click Upload Artifact, and select the Python package for the target connector. Download official Python packages for Flink connectors from the connector list.

  5. On the O\&M \> Deployments page, click Create Deployment \> Python Job, select the connector package in Additional Dependencies, configure the other parameters, and deploy the job.

  6. Click the job name. On the Configuration tab, go to Parameters, click Edit, and add the connector package path in Other Configuration. For multiple connector JARs, separate them with semicolons:

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'
  7. Optional: To use built-in connectors, formats, or catalogs (available in VVR 11.2 and later), add the following configuration in Other Configuration under Parameters:

    ## Use multiple built-in connectors
    pipeline.used-builtin-connectors: kafka;sls
    ## Use multiple data formats
    pipeline.used-builtin-formats: avro;parquet
    ## Use multiple existing catalogs
    pipeline.used-builtin-catalogs: catalogname1;catalogname2

For complete connector usage examples, see Complete sample code.

Debug a job

When you implement a Python UDF, use the logging module to output log information for troubleshooting:

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
    logging.info("Computing add: %s + %s", i, j)
    return i + j

View the output in the TaskManager log files.

Debug locally

By default, Realtime Compute for Apache Flink cannot access the internet, so your code may not connect directly to online data sources for testing. Use one of the following approaches:

  • Unit tests: Run independent unit tests on your UDFs to verify logic correctness.

  • Local execution: Simulate input with in-memory data and run the job locally to validate processing logic:

    from pyflink.datastream import StreamExecutionEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()
    # Test using in-memory data
    ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)])
    ds.key_by(lambda x: x[0]).sum(1).print()
    env.execute("local_test")
  • Remote debugging: To debug with online data sources, see Run and debug jobs with connectors locally.

Deploy a job

After you finish developing your PyFlink job, deploy it to Realtime Compute for Apache Flink:

  1. Log on to the Realtime Compute for Apache Flink console and go to the target workspace.

  2. In the left navigation pane, click Artifacts. Upload your Python job files (.py or .zip), third-party dependencies, and configuration files.

  3. On the O\&M \> Deployments page, click Create Deployment \> Python Job, and fill in the deployment parameters:

    Parameter Description
    Python URI Select the uploaded Python job file.
    Entry Module Leave blank for .py files. For .zip files, enter the entry module name — for example, my_job.
    Additional Dependencies Select connector JARs or configuration files, if any.
    Python Libraries Select third-party Python packages (.whl or .zip), if any.
    Python Archives Select custom Python virtual environments (.zip), if any.
  4. Click Deploy.

For details about deployment parameters, see Deploy a job.

Complete sample code

The following example reads data from Kafka, filters records, and writes results to MySQL. All table definitions use the Table API and SQL.

This example does not include configurations for checkpoints or restart strategies. Add these after deployment on the Configuration tab. For details, see Configure job deployment settings.
import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

logging.basicConfig(stream=sys.stdout, level=logging.INFO)


def kafka_to_mysql():
    # Create execution environments
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)

    # Create Kafka source table
    t_env.execute_sql("""
        CREATE TABLE kafka_source (
            `id` INT,
            `name` STRING,
            `score` INT,
            `event_time` TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'student_topic',
            'properties.bootstrap.servers' = 'your-kafka-broker:9092',
            'properties.group.id' = 'my-group',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)

    # Create MySQL sink table
    t_env.execute_sql("""
        CREATE TABLE mysql_sink (
            `id` INT,
            `name` STRING,
            `score` INT,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
            'table-name' = 'student',
            'username' = 'your_username',
            'password' = 'your_password'
        )
    """)

    # Filter records with score >= 60 and write to MySQL
    t_env.execute_sql("""
        INSERT INTO mysql_sink
        SELECT id, name, score
        FROM kafka_source
        WHERE score >= 60
    """)


if __name__ == '__main__':
    kafka_to_mysql()

Replace the following placeholders:

Placeholder Description Example
your-kafka-broker:9092 Kafka bootstrap server address broker.example.com:9092
your-mysql-host:3306/my_database MySQL host and database db.example.com:3306/students
your_username MySQL username admin
your_password MySQL password

Pre-installed software packages

VVR-11

The following software packages are installed in the Flink workspace.

Software package Version
apache-beam 2.48.0
avro-python3 1.10.2
brotlipy 0.7.0
certifi 2022.12.7
cffi 1.15.1
charset-normalizer 2.0.4
cloudpickle 2.2.1
conda 22.11.1
conda-content-trust 0.1.3
conda-package-handling 1.9.0
crcmod 1.7
cryptography 38.0.1
Cython 3.0.12
dill 0.3.1.1
dnspython 2.7.0
docopt 0.6.2
exceptiongroup 1.3.0
fastavro 1.12.1
fasteners 0.20
find_libpython 0.5.0
grpcio 1.56.2
grpcio-tools 1.56.2
hdfs 2.7.3
httplib2 0.22.0
idna 3.4
importlib_metadata 8.7.0
iniconfig 2.1.0
isort 6.1.0
numpy 1.24.4
objsize 0.6.1
orjson 3.9.15
packaging 25.0
pandas 2.3.3
pemja 0.5.5
pip 22.3.1
pluggy 1.0.0
proto-plus 1.26.1
protobuf 4.25.8
py-spy 0.4.0
py4j 0.10.9.7
pyarrow 11.0.0
pyarrow-hotfix 0.6
pycodestyle 2.14.0
pycosat 0.6.4
pycparser 2.21
pydot 1.4.2
pymongo 4.15.4
pyOpenSSL 22.0.0
pyparsing 3.2.5
PySocks 1.7.1
pytest 7.4.4
python-dateutil 2.9.0
pytz 2025.2
regex 2025.11.3
requests 2.32.5
ruamel.yaml 0.18.16
ruamel.yaml.clib 0.2.14
setuptools 70.0.0
six 1.16.0
tomli 2.3.0
toolz 0.12.0
tqdm 4.64.1
typing_extensions 4.15.0
tzdata 2025.2
urllib3 1.26.13
wheel 0.38.4
zipp 3.23.0
zstandard 0.25.0

VVR-8

The following software packages are pre-installed in the Flink workspace.

Software package Version
apache-beam 2.43.0
avro-python3 1.9.2.1
certifi 2025.7.9
charset-normalizer 3.4.2
cloudpickle 2.2.0
crcmod 1.7
Cython 0.29.24
dill 0.3.1.1
docopt 0.6.2
fastavro 1.4.7
fasteners 0.19
find_libpython 0.4.1
grpcio 1.46.3
grpcio-tools 1.46.3
hdfs 2.7.3
httplib2 0.20.4
idna 3.10
isort 6.0.1
numpy 1.21.6
objsize 0.5.2
orjson 3.10.18
pandas 1.3.5
pemja 0.3.2
pip 22.3.1
proto-plus 1.26.1
protobuf 3.20.3
py4j 0.10.9.7
pyarrow 8.0.0
pycodestyle 2.14.0
pydot 1.4.2
pymongo 3.13.0
pyparsing 3.2.3
python-dateutil 2.9.0
pytz 2025.2
regex 2024.11.6
requests 2.32.4
setuptools 58.1.0
six 1.17.0
typing_extensions 4.14.1
urllib3 2.5.0
wheel 0.33.4
zstandard 0.23.0

VVR-6

The following software packages are pre-installed in the Flink workspace.

Software package Version
apache-beam 2.27.0
avro-python3 1.9.2.1
certifi 2024.8.30
charset-normalizer 3.3.2
cloudpickle 1.2.2
crcmod 1.7
Cython 0.29.16
dill 0.3.1.1
docopt 0.6.2
fastavro 0.23.6
future 0.18.3
grpcio 1.29.0
hdfs 2.7.3
httplib2 0.17.4
idna 3.8
importlib-metadata 6.7.0
isort 5.11.5
jsonpickle 2.0.0
mock 2.0.0
numpy 1.19.5
oauth2client 4.1.3
pandas 1.1.5
pbr 6.1.0
pemja 0.1.4
pip 20.1.1
protobuf 3.17.3
py4j 0.10.9.3
pyarrow 2.0.0
pyasn1 0.5.1
pyasn1-modules 0.3.0
pycodestyle 2.10.0
pydot 1.4.2
pymongo 3.13.0
pyparsing 3.1.4
python-dateutil 2.8.0
pytz 2024.1
requests 2.31.0
rsa 4.9
setuptools 47.1.0
six 1.16.0
typing-extensions 3.7.4.3
urllib3 2.0.7
wheel 0.42.0
zipp 3.15.0

What's next