All Products
Search
Document Center

Realtime Compute for Apache Flink:Python Job Development

Last Updated:Mar 04, 2026

This topic describes background information, limits, development methods, debugging techniques, and connector usage for Flink Python API jobs.

Background information

Develop Flink Python jobs locally. After development, deploy and start the job in the Flink development console to see business results. For the complete workflow, see Quick Start for Flink Python jobs.

Development environment requirements

  • Ververica Runtime (VVR) versions earlier than 8.0.11 include Python 3.7.9. VVR 8.0.11 or later include Python 3.9.21.

    Note

    Use a local Python version that matches the Python version pre-installed in your target VVR engine.

  • Install PyFlink with a version that matches the Flink version used by your target VVR engine. For example, if you select vvr-8.0.9-flink-1.17 on the deployment page, install apache-flink==1.17.*.

    pip install apache-flink==1.17.2
  • Install an IDE. We recommend PyCharm or VS Code.

  • Complete Python job development offline, then deploy and run the job in the Realtime Compute Management Console.

Limits

Due to deployment and network environment constraints, observe the following limits when developing Python jobs:

  • Only Apache Flink 1.13 and later are supported.

  • The Flink workspace includes a pre-installed Python environment with common libraries such as pandas, NumPy, and PyArrow. For details, see the pre-installed software package list at the end of this topic.

  • The Flink runtime environment supports only JDK 8 and JDK 11. If your Python job depends on third-party JARs, ensure they are compatible.

  • VVR 4.x supports only Scala 2.11. VVR 6.x and later support only Scala 2.12. If your Python job depends on third-party JARs, ensure the JAR dependencies match the appropriate Scala version.

Job development

Choosing between Table API/SQL and DataStream API

PyFlink supports both Table API/SQL and DataStream API. We recommend using Table API/SQL first for these reasons:

  • Better performance: Table API/SQL execution plans are optimized and run entirely within the JVM. DataStream API requires serialization and deserialization of each record between the JVM and Python processes, which incurs significant overhead.

  • More complete features: Table API/SQL offers fuller support for connectors, data formats, and window functions, and shares the same connector ecosystem as SQL jobs.

  • Community direction: The Apache Flink community focuses PyFlink development primarily on Table API/SQL.

Use DataStream API only for complex custom logic that cannot be expressed in SQL.

Development reference

Use the following documents to develop Flink business code locally. After development, upload your code to the Flink development console and deploy the job.

Project structure

We recommend the following project structure for Python jobs:

my-flink-python-project/
├── my_job.py                # Main job file
├── udfs.py                  # User-defined functions (optional)
├── requirements.txt         # Third-party Python dependencies (optional)
└── config.properties        # Configuration file (optional)

Dependency management

For instructions on using custom Python virtual environments, third-party Python packages, JARs, and data files in Python jobs, see Use Python dependencies.

User-defined functions (UDFs)

The following example shows how to develop a Python UDSF that desensitizes strings:

from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
    """Mask phone numbers: keep the first 3 and last 4 digits, replace the middle with ****"""
    if phone is None or len(phone) != 11:
        return phone
    return phone[:3] + '****' + phone[7:]

Use this UDF in a SQL job as follows:

CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;

For instructions on registering, updating, and deleting UDFs, see Manage user-defined functions (UDFs).

Use connectors

For a list of supported connectors, see Supported connectors. To use a connector, follow these steps:

  1. Log on to the Realtime Compute console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, click File Management.

  4. Click Upload Artifact and select the Python package for your target connector.

    You can upload a custom connector or a built-in Flink connector. For download links to official Flink connector Python packages, see Connector list.

  5. On the O&M > Job O&M page, click Deploy Job > Python Job. In the Additional Dependencies field, select your connector package, configure other parameters, and deploy the job.

  6. Click the deployed job name. On the Deployment Details tab, in the Runtime Parameters section, click Edit. In the Other Configuration field, add the path to your Python connector package.

    If your job depends on multiple connector packages—for example, connector-1.jar and connector-2.jar—configure them as follows:

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'
  7. To use built-in connectors, data formats, and catalogs (VVR 11.2 or later only), add the following configuration in the Other Configuration field under Runtime Parameters:

    pipeline.used-builtin-connectors: kafka;sls
    pipeline.used-builtin-formats: avro;parquet

Job debugging

In your Python user-defined function implementation, use the logging module to output log information for troubleshooting. For example:

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
  logging.info("hello world")
  return i + j

After logging, view the output in the TaskManager log files.

Local debugging

By default, Realtime Compute for Apache Flink cannot access the public network, so your code might not connect directly to online data sources for testing. Use one of these approaches for local debugging:

  • Unit tests: Test user-defined functions (UDFs) independently to verify logic.

  • Local execution: Simulate input using local data sources such as files or in-memory data, then run the job locally to validate processing logic. For example:

    from pyflink.datastream import StreamExecutionEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()
    # Use a local data source for testing
    ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)])
    ds.key_by(lambda x: x[0]).sum(1).print()
    env.execute("local_test")
  • Remote debugging: To debug with online data sources, see Run and debug connector-based jobs locally.

Job deployment

After developing a Python job, upload it to the Realtime Compute console for deployment. Follow these steps:

  1. Log on to the Realtime Compute console and go to your target workspace.

  2. In the navigation pane on the left, click File Management. Upload your Python job file (.py or .zip), along with any third-party dependencies or configuration files.

  3. On the O&M > Job O&M page, click Deploy Job > Python Job and enter deployment information.

    Parameter

    Description

    Python file path

    Select the uploaded Python job file.

    Entry Module

    Leave blank if your job file is a .py file. If it is a .zip file, enter the entry module name, such as my_job.

    Additional Dependencies

    Select any connector JARs or configuration files here.

    Python Libraries

    Select any third-party Python packages (.whl or .zip) here.

    Python Archives

    Select any custom Python virtual environment (.zip) here.

  4. Click Deploy.

For more deployment parameter details, see Deploy a job.

Complete sample code

This example shows a Python streaming job that reads from Kafka, processes data, and writes to MySQL. Use it for reference only.

Note

This example does not include checkpointing or restart policy configurations. You can customize these settings after deployment on the Deployment Details page. For details, see Configure job deployment information.

import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

logging.basicConfig(stream=sys.stdout, level=logging.INFO)


def kafka_to_mysql():
    # Create execution environment
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)

    # Create Kafka source table
    t_env.execute_sql("""
        CREATE TABLE kafka_source (
            `id` INT,
            `name` STRING,
            `score` INT,
            `event_time` TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'student_topic',
            'properties.bootstrap.servers' = 'your-kafka-broker:9092',
            'properties.group.id' = 'my-group',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)

    # Create MySQL sink table
    t_env.execute_sql("""
        CREATE TABLE mysql_sink (
            `id` INT,
            `name` STRING,
            `score` INT,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
            'table-name' = 'student',
            'username' = 'your_username',
            'password' = 'your_password'
        )
    """)

    # Filter records with score >= 60 and write to MySQL
    t_env.execute_sql("""
        INSERT INTO mysql_sink
        SELECT id, name, score
        FROM kafka_source
        WHERE score >= 60
    """)


if __name__ == '__main__':
    kafka_to_mysql()

Pre-installed software package list

VVR-11

The following software packages are installed in the Flink workspace.

Software package

Version

apache-beam

2.48.0

avro-python3

1.10.2

brotlipy

0.7.0

certifi

2022.12.7

cffi

1.15.1

charset-normalizer

2.0.4

cloudpickle

2.2.1

conda

22.11.1

conda-content-trust

0.1.3

conda-package-handling

1.9.0

crcmod

1.7

cryptography

38.0.1

Cython

3.0.12

dill

0.3.1.1

dnspython

2.7.0

docopt

0.6.2

exceptiongroup

1.3.0

fastavro

1.12.1

fasteners

0.20

find_libpython

0.5.0

grpcio

1.56.2

grpcio-tools

1.56.2

hdfs

2.7.3

httplib2

0.22.0

idna

3.4

importlib_metadata

8.7.0

iniconfig

2.1.0

isort

6.1.0

numpy

1.24.4

objsize

0.6.1

orjson

3.9.15

packaging

25.0

pandas

2.3.3

pemja

0.5.5

pip

22.3.1

pluggy

1.0.0

proto-plus

1.26.1

protobuf

4.25.8

py-spy

0.4.0

py4j

0.10.9.7

pyarrow

11.0.0

pyarrow-hotfix

0.6

pycodestyle

2.14.0

pycosat

0.6.4

pycparser

2.21

pydot

1.4.2

pymongo

4.15.4

pyOpenSSL

22.0.0

pyparsing

3.2.5

PySocks

1.7.1

pytest

7.4.4

python-dateutil

2.9.0

pytz

2025.2

regex

2025.11.3

requests

2.32.5

ruamel.yaml

0.18.16

ruamel.yaml.clib

0.2.14

setuptools

70.0.0

six

1.16.0

tomli

2.3.0

toolz

0.12.0

tqdm

4.64.1

typing_extensions

4.15.0

tzdata

2025.2

urllib3

1.26.13

wheel

0.38.4

zipp

3.23.0

zstandard

0.25.0

VVR-8

The following software packages are installed in the Flink workspace.

Software package

Version

apache-beam

2.43.0

avro-python3

1.9.2.1

certifi

2025.7.9

charset-normalizer

3.4.2

cloudpickle

2.2.0

crcmod

1.7

Cython

0.29.24

dill

0.3.1.1

docopt

0.6.2

fastavro

1.4.7

fasteners

0.19

find_libpython

0.4.1

grpcio

1.46.3

grpcio-tools

1.46.3

hdfs

2.7.3

httplib2

0.20.4

idna

3.10

isort

6.0.1

numpy

1.21.6

objsize

0.5.2

orjson

3.10.18

pandas

1.3.5

pemja

0.3.2

pip

22.3.1

proto-plus

1.26.1

protobuf

3.20.3

py4j

0.10.9.7

pyarrow

8.0.0

pycodestyle

2.14.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.2.3

python-dateutil

2.9.0

pytz

2025.2

regex

2024.11.6

requests

2.32.4

setuptools

58.1.0

six

1.17.0

typing_extensions

4.14.1

urllib3

2.5.0

wheel

0.33.4

zstandard

0.23.0

VVR-6

The following software packages are installed in the Flink workspace.

Software package

Version

apache-beam

2.27.0

avro-python3

1.9.2.1

certifi

2024.8.30

charset-normalizer

3.3.2

cloudpickle

1.2.2

crcmod

1.7

Cython

0.29.16

dill

0.3.1.1

docopt

0.6.2

fastavro

0.23.6

future

0.18.3

grpcio

1.29.0

hdfs

2.7.3

httplib2

0.17.4

idna

3.8

importlib-metadata

6.7.0

isort

5.11.5

jsonpickle

2.0.0

mock

2.0.0

numpy

1.19.5

oauth2client

4.1.3

pandas

1.1.5

pbr

6.1.0

pemja

0.1.4

pip

20.1.1

protobuf

3.17.3

py4j

0.10.9.3

pyarrow

2.0.0

pyasn1

0.5.1

pyasn1-modules

0.3.0

pycodestyle

2.10.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.1.4

python-dateutil

2.8.0

pytz

2024.1

requests

2.31.0

rsa

4.9

setuptools

47.1.0

six

1.16.0

typing-extensions

3.7.4.3

urllib3

2.0.7

wheel

0.42.0

zipp

3.15.0

References