Topik ini menjelaskan informasi latar belakang, catatan penggunaan, metode pengembangan, metode debugging, dan penggunaan konektor untuk pekerjaan Flink Python API (PyFlink).
Informasi latar belakang
Anda mengembangkan pekerjaan PyFlink secara lokal. Setelah selesai, terapkan dan jalankan pekerjaan tersebut di Konsol Realtime Compute for Apache Flink untuk melihat hasil bisnisnya. Untuk panduan lengkap proses ini, lihat Panduan Cepat untuk Pekerjaan Python Flink.
Persyaratan lingkungan pengembangan
Ververica Runtime (VVR) versi sebelum 8.0.11 menyertakan Python 3.7.9, sedangkan VVR 8.0.11 atau yang lebih baru menyertakan Python 3.9.21.
CatatanKami menyarankan menggunakan versi Python yang sama di lingkungan pengembangan lokal Anda dengan versi yang telah dipra-instal di mesin VVR target Anda.
Instal PyFlink. Versi PyFlink harus sesuai dengan versi Apache Flink yang digunakan oleh mesin VVR target Anda. Misalnya, jika Anda memilih
vvr-8.0.9-flink-1.17pada halaman penyebaran, instalapache-flink==1.17.*.pip install apache-flink==1.17.2Instal IDE. Kami merekomendasikan PyCharm atau VS Code.
Anda mengembangkan pekerjaan PyFlink secara offline, lalu menerapkan dan menjalankannya di Konsol Realtime Compute for Apache Flink.
Batasan
Saat mengembangkan pekerjaan PyFlink, perhatikan persyaratan berikut yang diberlakukan oleh lingkungan penyebaran dan jaringan:
Hanya Apache Flink 1.13 dan yang lebih baru yang didukung.
Ruang kerja Flink mencakup lingkungan Python yang telah dipra-instal dengan pustaka umum seperti pandas, NumPy, dan PyArrow. Untuk daftar lengkapnya, lihat Paket perangkat lunak yang telah dipra-instal di akhir topik ini.
Lingkungan runtime Flink hanya mendukung JDK 8 dan JDK 11. Jika pekerjaan PyFlink Anda menggunakan JAR pihak ketiga, pastikan JAR tersebut kompatibel dengan versi JDK ini.
VVR 4.x hanya mendukung Scala 2.11, sedangkan VVR 6.x dan yang lebih baru hanya mendukung Scala 2.12. Jika pekerjaan PyFlink Anda menggunakan JAR pihak ketiga, pastikan dependensi JAR sesuai dengan versi Scala yang tepat.
Mengembangkan pekerjaan
Pilih antara Table API/SQL dan DataStream API
PyFlink mendukung Table API/SQL dan DataStream API. Kami merekomendasikan menggunakan Table API/SQL terlebih dahulu. Berikut alasannya:
Kinerja lebih baik: Rencana eksekusi Table API/SQL berjalan sepenuhnya di dalam JVM setelah dioptimalkan. DataStream API memerlukan serialisasi dan deserialisasi per-record antara proses JVM dan Python, yang menambahkan overhead signifikan.
Fitur lebih lengkap: Table API/SQL menyediakan dukungan penuh untuk konektor, format data, dan fungsi jendela. API ini juga berbagi ekosistem konektor yang sama dengan pekerjaan SQL.
Rekomendasi komunitas: Komunitas Apache Flink memprioritaskan Table API/SQL untuk pengembangan PyFlink.
Gunakan DataStream API hanya jika SQL tidak dapat mengekspresikan logika kustom yang kompleks.
Referensi pengembangan
Gunakan sumber daya berikut untuk mengembangkan logika bisnis Flink secara lokal. Setelah selesai, unggah kode ke Konsol Realtime Compute for Apache Flink dan terapkan pekerjaan tersebut.
Untuk cara mengembangkan logika bisnis Apache Flink 1.20, lihat Panduan Pengembangan API Python Flink.
Untuk masalah umum dan solusinya selama pengkodean Apache Flink, lihat FAQ.
Struktur proyek
Struktur proyek pekerjaan Python yang direkomendasikan adalah sebagai berikut:
my-flink-python-project/
├── my_job.py # File pekerjaan utama
├── udfs.py # Fungsi yang ditentukan pengguna (opsional)
├── requirements.txt # Dependensi Python pihak ketiga (opsional)
└── config.properties # File konfigurasi (opsional)Manajemen dependensi
Untuk cara menggunakan lingkungan virtual Python kustom, paket Python pihak ketiga, JAR, dan file data dalam pekerjaan PyFlink, lihat Menggunakan dependensi Python.
Fungsi yang ditentukan pengguna (UDF)
Contoh berikut menunjukkan cara mengembangkan UDSF Python untuk mendesensitisasi string:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""Menyamarkan nomor telepon: pertahankan tiga digit pertama dan empat digit terakhir; ganti digit tengah dengan ****."""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]Untuk menggunakan UDF ini dalam pekerjaan SQL:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;Untuk cara mendaftarkan, memperbarui, dan menghapus UDF, lihat Mengelola fungsi yang ditentukan pengguna (UDF).
Menggunakan konektor
Untuk daftar konektor yang didukung Flink, lihat Konektor yang didukung. Untuk menggunakan konektor, ikuti langkah-langkah berikut:
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi kiri, klik File Management.
Klik Upload Artifact, lalu pilih paket Python untuk konektor target.
Anda dapat mengunggah konektor kustom atau konektor bawaan dari Realtime Compute for Apache Flink. Untuk tautan unduhan paket Python resmi untuk konektor Flink, lihat Daftar konektor.
Di halaman , klik , pilih paket Python untuk konektor target di kolom Additional Dependencies, konfigurasikan parameter lainnya, lalu terapkan pekerjaan tersebut.
Klik nama pekerjaan yang telah diterapkan. Di tab Deployment Details, pada bagian Runtime Parameter Settings, klik Edit, lalu di Other Configuration, tambahkan path ke paket konektor Python.
Jika pekerjaan Anda bergantung pada beberapa paket konektor—misalnya, connector-1.jar dan connector-2.jar—konfigurasikan sebagai berikut:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'Untuk menggunakan konektor bawaan, format data, atau katalog (hanya tersedia di VVR 11.2 dan yang lebih baru), tambahkan konfigurasi di kolom Other Configuration di bawah Parameters. Contohnya:
## Gunakan beberapa konektor bawaan pipeline.used-builtin-connectors: kafka;sls ## Gunakan beberapa format data pipeline.used-builtin-formats: avro;parquet ## Gunakan beberapa katalog yang ada pipeline.used-builtin-catalogs: catalogname1;catalogname2
Untuk contoh penggunaan konektor secara detail, lihat Kode contoh lengkap.
Men-debug Pekerjaan
Saat mengimplementasikan fungsi yang ditentukan pengguna (UDF) Python, gunakan modul logging untuk mengeluarkan informasi log guna troubleshooting. Potongan kode berikut menunjukkan contohnya:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jSetelah melakukan logging, lihat output-nya di file log TaskManager.
Debugging lokal
Secara default, Realtime Compute for Apache Flink tidak dapat mengakses Internet. Kode Anda mungkin tidak dapat terhubung langsung ke sumber data online untuk pengujian. Kami merekomendasikan pendekatan berikut untuk debugging lokal:
Pengujian unit: Jalankan pengujian unit independen pada UDF Anda untuk memverifikasi kebenaran logika.
Eksekusi lokal: Simulasikan input menggunakan sumber data lokal seperti file atau data in-memory. Lalu jalankan pekerjaan secara lokal untuk memvalidasi logika pemrosesan. Contoh:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # Uji menggunakan sumber data lokal ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")Debugging jarak jauh: Untuk debugging dengan sumber data online, lihat Menjalankan dan mendebug pekerjaan dengan konektor secara lokal.
Menyebarkan pekerjaan
Setelah mengembangkan pekerjaan PyFlink, unggah ke Konsol Realtime Compute for Apache Flink untuk diterapkan. Ikuti langkah-langkah berikut:
Masuk ke Konsol Realtime Compute for Apache Flink dan buka ruang kerja target.
Di panel navigasi kiri, klik File Management, lalu unggah file pekerjaan Python (.py atau .zip). Unggah juga dependensi pihak ketiga atau file konfigurasi apa pun.
Di halaman , klik , lalu isi informasi penyebaran.
Parameter
Deskripsi
Python file path
Pilih file pekerjaan Python yang telah diunggah.
Entry Module
Biarkan kosong untuk file .py. Untuk file .zip, masukkan nama modul entri—misalnya,
my_job.Additional Dependencies
Pilih JAR konektor atau file konfigurasi, jika ada.
Python Libraries
Pilih paket Python pihak ketiga (.whl atau .zip), jika ada.
Python Archives
Pilih lingkungan virtual Python kustom (.zip), jika ada.
Klik Deploy.
Untuk detail lebih lanjut tentang parameter penyebaran, lihat Menyebarkan pekerjaan.
Kode contoh lengkap
Contoh ini menunjukkan pekerjaan streaming Python yang membaca data dari Kafka, memprosesnya, lalu menulis hasilnya ke MySQL. Gunakan hanya sebagai referensi.
Contoh ini tidak mencakup konfigurasi untuk checkpoint atau strategi restart. Anda dapat menambahkannya setelah penyebaran di tab Configuration. Untuk detailnya, lihat Mengonfigurasi pengaturan penyebaran pekerjaan.
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# Buat lingkungan eksekusi
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Buat tabel sumber Kafka
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Buat tabel sink MySQL
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# Filter catatan dengan skor >= 60 dan tulis ke MySQL
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()Paket perangkat lunak yang telah dipra-instal
VVR-11
Paket perangkat lunak berikut diinstal di ruang kerja Flink.
Paket perangkat lunak | Versi |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
Paket perangkat lunak berikut telah dipra-instal di lingkungan ruang kerja Flink.
Paket perangkat lunak | Versi |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
Paket perangkat lunak berikut telah dipra-instal di lingkungan ruang kerja Flink.
Paket perangkat lunak | Versi |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
Referensi
Untuk panduan lengkap proses pengembangan pekerjaan PyFlink, lihat Panduan Cepat untuk Pekerjaan Python Flink.
Untuk detail penggunaan lingkungan virtual Python kustom, pustaka Python pihak ketiga, JAR, dan file data dalam pekerjaan Python Flink, lihat Menggunakan dependensi Python.
Realtime Compute for Apache Flink juga mendukung pekerjaan SQL dan DataStream. Untuk panduan pengembangan jenis pekerjaan ini, lihat Ikhtisar pengembangan pekerjaan dan Mengembangkan pekerjaan JAR.