全部產品
Search
文件中心

Realtime Compute for Apache Flink:Python作業開發

更新時間:Mar 17, 2026

本文為您介紹Flink Python API作業開發的背景資訊、使用限制、開發方法、調試方法和連接器使用等。

背景資訊

Flink Python作業需要您在本地完成開發工作,Python作業開發完成後,再在Flink開發控制台上部署並啟動才能看到業務效果。整體的操作流程詳情請參見Flink Python作業快速入門

開發環境要求

  • Realtime Compute引擎VVR 8.0.11以下版本預裝Python 3.7.9版本,VVR 8.0.11及以上版本預裝Python 3.9.21版本。

    說明

    建議本地開發環境的Python版本與目標VVR引擎預裝的Python版本保持一致。

  • 已安裝PyFlink,且版本與目標VVR引擎使用的Flink版本一致。例如您在部署頁面選擇的引擎為vvr-8.0.9-flink-1.17,則需安裝apache-flink==1.17.*

    pip install apache-flink==1.17.2
  • 已安裝IDE開發工具,推薦PyCharm或VS Code。

  • Python作業需要您線上下完成開發,再在Realtime Compute管理主控台上部署並運行。

使用限制

由於Flink受部署環境、網路環境等因素的影響,開發Python作業需要注意以下限制:

  • 僅支援開源Flink V1.13及以上版本。

  • Flink工作空間已預裝了Python環境,且Python環境中已預裝了Pandas、NumPy、PyArrow等常用的Python庫。詳見本文末尾預裝軟體包列表

  • Flink運行環境僅支援JDK 8和JDK 11,如果Python作業中依賴第三方JAR包,請確保JAR包相容。

  • VVR 4.x僅支援開源Scala V2.11版本,VVR 6.x 及以上版本僅支援開源Scala V2.12版本。如果Python作業中依賴第三方JAR包,請確保使用Scala版本對應的JAR包依賴。

作業開發

Table API/SQL與DataStream API的選擇

PyFlink支援Table API/SQL和DataStream API兩種開發方式,推薦優先使用Table API/SQL。原因如下:

  • 效能更優:Table API/SQL的執行計畫經過最佳化後完全在JVM內執行。而DataStream API需要在JVM和Python進程之間逐條資料進行序列化/還原序列化,效能開銷較大。

  • 功能更完善:Table API/SQL對連接器、資料格式、視窗函數等支援更完整,且與SQL作業共用同一套連接器生態。

  • 社區推薦:Apache Flink社區在PyFlink的發展方向上以Table API/SQL為主。

只有在SQL無法表達的複雜自訂邏輯情境下,才建議使用DataStream API。

開發參考

您可以參見以下文檔在本地完成Flink業務代碼開發,開發完成後您需要將其上傳到Flink開發控制台,並部署上線作業。

專案結構

推薦的Python作業專案結構如下:

my-flink-python-project/
├── my_job.py                # 主作業檔案
├── udfs.py                  # 自訂函數(可選)
├── requirements.txt         # 第三方Python依賴(可選)
└── config.properties        # 設定檔(可選)

依賴管理

Python作業中使用自訂的Python虛擬環境、第三方Python包、JAR包和資料檔案的方法,請參見使用Python依賴

自訂函數(UDF)

以下是一個Python UDSF的開發樣本,用於對字串進行脫敏處理:

from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(result_type=DataTypes.STRING())def mask_phone(phone: str):"""手機號脫敏:保留前3位和後4位,中間用****替代"""if phone is None or len(phone) != 11:return phone
    return phone[:3] + '****' + phone[7:]

在SQL作業中使用該UDF:

CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;

UDF的註冊、更新和刪除方法,請參見管理自訂函數(UDF)

連接器使用

Flink所支援的連接器列表,請參見支援的連接器。連接器使用方法如下:

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊檔案管理

  4. 單擊上傳資源,選擇您要上傳的目標連接器的Python包。

    您可以上傳自己開發的連接器,也可以上傳Flink提供的連接器。Flink提供的連接器官方Python包的下載地址,請參見Connector列表

  5. 營運中心 > 作業營運頁面,單擊部署作業 > Python 作業附加依賴檔案項選擇目標連接器的Python包,配置其他參數並部署作業。

  6. 單擊部署的作業名稱,在部署詳情頁簽運行參數配置地區,單擊編輯,在其他配置中,添加Python連接器包位置資訊。

    如果您的作業需要依賴多個連接器Python包,例如依賴的2個包的名字分別為connector-1.jar和connector-2.jar,則配置資訊如下。

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'
  7. 如果您需要使用內建連接器、資料格式和Catalog(僅VVR 11.2及以上版本),可在作業的運行參數配置地區其他配置項中添加配置,例如:

    ## 多個連接器使用
    pipeline.used-builtin-connectors: kafka;sls
    ## 傳輸資料的多種資料格式
    pipeline.used-builtin-formats: avro;parquet
    ## 使用了多個已經建立的Catalog
    pipeline.used-builtin-catalogs: catalogname1;catalogname2

具體的連接器使用方式詳情請參見完整範例程式碼

作業調試

您可以在Python自訂函數的代碼實現中,通過logging的方式,輸出日誌資訊,方便後期問題定位,樣本如下。

import logging

@udf(result_type=DataTypes.BIGINT())
def add(i, j):
  logging.info("hello world")
  return i + j

日誌輸出後,您可以在TaskManager的記錄檔中查看。

本地調試

由於Realtime ComputeFlink版預設不具備訪問公網的能力,您的代碼可能無法在本地直接連接線上資料來源進行測試。建議按照以下方式進行本地調試:

  • 單元測試:對自訂函數(UDF)進行獨立的單元測試,確保函數邏輯正確。

  • 本地運行:使用本機資料源(如檔案或記憶體資料)類比輸入,在本地運行作業驗證處理邏輯。樣本如下:

    from pyflink.datastream import StreamExecutionEnvironment
    
    env = StreamExecutionEnvironment.get_execution_environment()# 使用本機資料源進行測試
    ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)])
    ds.key_by(lambda x: x[0]).sum(1).print()
    env.execute("local_test")
  • 遠端偵錯:如需連接線上資料來源調試,請參見本地運行和調試包含連接器的作業

作業部署

Python作業開發完成後,需要上傳到Realtime Compute控制台進行部署。操作步驟如下:

  1. 登入Realtime Compute控制台,進入目標工作空間。

  2. 在左側導覽列單擊檔案管理,上傳Python作業檔案(.py或.zip)。如有第三方依賴或設定檔,也需一併上傳。

  3. 營運中心 > 作業營運頁面,單擊部署作業 > Python作業,填寫部署資訊。

    參數

    說明

    Python檔案地址

    選擇已上傳的Python作業檔案。

    Entry Module

    如果作業檔案為.py檔案,無需填寫;如果為.zip檔案,需填寫入口模組名稱,例如my_job

    附加依賴檔案

    如有連接器JAR包或設定檔,在此選擇。

    Python Libraries

    如有第三方Python包(.whl或.zip),在此選擇。

    Python Archives

    如有自訂Python虛擬環境(.zip),在此選擇。

  4. 單擊部署

更多部署參數詳情請參見部署作業

完整範例程式碼

本樣本展示一個從Kafka讀取資料、進行簡單處理後寫入MySQL的Python流作業,僅供參考。

說明

樣本中未包含Checkpoint、重啟策略等運行參數的配置。上述配置可在部署作業完成後,通過部署詳情頁進行自訂配置。詳情請參見配置作業部署資訊

import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

logging.basicConfig(stream=sys.stdout, level=logging.INFO)


def kafka_to_mysql():
    # 建立執行環境
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)

    # 建立 Kafka 源表
    t_env.execute_sql("""
        CREATE TABLE kafka_source (
            `id` INT,
            `name` STRING,
            `score` INT,
            `event_time` TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'student_topic',
            'properties.bootstrap.servers' = 'your-kafka-broker:9092',
            'properties.group.id' = 'my-group',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)

    # 建立 MySQL 結果表
    t_env.execute_sql("""
        CREATE TABLE mysql_sink (
            `id` INT,
            `name` STRING,
            `score` INT,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
            'table-name' = 'student',
            'username' = 'your_username',
            'password' = 'your_password'
        )
    """)

    # 篩選分數大於60的記錄並寫入 MySQL
    t_env.execute_sql("""
        INSERT INTO mysql_sink
        SELECT id, name, score
        FROM kafka_source
        WHERE score >= 60
    """)


if __name__ == '__main__':
    kafka_to_mysql()

預裝軟體包列表

VVR-11

Flink工作空間已安裝下列軟體包。

軟體包

版本

apache-beam

2.48.0

avro-python3

1.10.2

brotlipy

0.7.0

certifi

2022.12.7

cffi

1.15.1

charset-normalizer

2.0.4

cloudpickle

2.2.1

conda

22.11.1

conda-content-trust

0.1.3

conda-package-handling

1.9.0

crcmod

1.7

cryptography

38.0.1

Cython

3.0.12

dill

0.3.1.1

dnspython

2.7.0

docopt

0.6.2

exceptiongroup

1.3.0

fastavro

1.12.1

fasteners

0.20

find_libpython

0.5.0

grpcio

1.56.2

grpcio-tools

1.56.2

hdfs

2.7.3

httplib2

0.22.0

idna

3.4

importlib_metadata

8.7.0

iniconfig

2.1.0

isort

6.1.0

numpy

1.24.4

objsize

0.6.1

orjson

3.9.15

packaging

25.0

pandas

2.3.3

pemja

0.5.5

pip

22.3.1

pluggy

1.0.0

proto-plus

1.26.1

protobuf

4.25.8

py-spy

0.4.0

py4j

0.10.9.7

pyarrow

11.0.0

pyarrow-hotfix

0.6

pycodestyle

2.14.0

pycosat

0.6.4

pycparser

2.21

pydot

1.4.2

pymongo

4.15.4

pyOpenSSL

22.0.0

pyparsing

3.2.5

PySocks

1.7.1

pytest

7.4.4

python-dateutil

2.9.0

pytz

2025.2

regex

2025.11.3

requests

2.32.5

ruamel.yaml

0.18.16

ruamel.yaml.clib

0.2.14

setuptools

70.0.0

six

1.16.0

tomli

2.3.0

toolz

0.12.0

tqdm

4.64.1

typing_extensions

4.15.0

tzdata

2025.2

urllib3

1.26.13

wheel

0.38.4

zipp

3.23.0

zstandard

0.25.0

VVR-8

Flink工作空間已安裝下列軟體包。

軟體包

版本

apache-beam

2.43.0

avro-python3

1.9.2.1

certifi

2025.7.9

charset-normalizer

3.4.2

cloudpickle

2.2.0

crcmod

1.7

Cython

0.29.24

dill

0.3.1.1

docopt

0.6.2

fastavro

1.4.7

fasteners

0.19

find_libpython

0.4.1

grpcio

1.46.3

grpcio-tools

1.46.3

hdfs

2.7.3

httplib2

0.20.4

idna

3.10

isort

6.0.1

numpy

1.21.6

objsize

0.5.2

orjson

3.10.18

pandas

1.3.5

pemja

0.3.2

pip

22.3.1

proto-plus

1.26.1

protobuf

3.20.3

py4j

0.10.9.7

pyarrow

8.0.0

pycodestyle

2.14.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.2.3

python-dateutil

2.9.0

pytz

2025.2

regex

2024.11.6

requests

2.32.4

setuptools

58.1.0

six

1.17.0

typing_extensions

4.14.1

urllib3

2.5.0

wheel

0.33.4

zstandard

0.23.0

VVR-6

Flink工作空間已安裝下列軟體包。

軟體包

版本

apache-beam

2.27.0

avro-python3

1.9.2.1

certifi

2024.8.30

charset-normalizer

3.3.2

cloudpickle

1.2.2

crcmod

1.7

Cython

0.29.16

dill

0.3.1.1

docopt

0.6.2

fastavro

0.23.6

future

0.18.3

grpcio

1.29.0

hdfs

2.7.3

httplib2

0.17.4

idna

3.8

importlib-metadata

6.7.0

isort

5.11.5

jsonpickle

2.0.0

mock

2.0.0

numpy

1.19.5

oauth2client

4.1.3

pandas

1.1.5

pbr

6.1.0

pemja

0.1.4

pip

20.1.1

protobuf

3.17.3

py4j

0.10.9.3

pyarrow

2.0.0

pyasn1

0.5.1

pyasn1-modules

0.3.0

pycodestyle

2.10.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.1.4

python-dateutil

2.8.0

pytz

2024.1

requests

2.31.0

rsa

4.9

setuptools

47.1.0

six

1.16.0

typing-extensions

3.7.4.3

urllib3

2.0.7

wheel

0.42.0

zipp

3.15.0

相關文檔