Topik ini menjelaskan informasi latar belakang, batasan, metode pengembangan, teknik debugging, dan penggunaan konektor untuk pekerjaan Flink Python API.
Informasi latar belakang
Kembangkan pekerjaan Flink Python secara lokal. Setelah pengembangan selesai, terapkan dan jalankan pekerjaan tersebut di Konsol pengembangan Flink untuk melihat hasil bisnisnya. Untuk alur kerja lengkap, lihat Panduan Cepat untuk Pekerjaan Python Flink.
Persyaratan lingkungan pengembangan
Ververica Runtime (VVR) versi sebelum 8.0.11 menyertakan Python 3.7.9. VVR 8.0.11 atau yang lebih baru menyertakan Python 3.9.21.
CatatanGunakan versi Python lokal yang sesuai dengan versi Python yang telah dipra-instal di engine VVR target Anda.
Instal PyFlink dengan versi yang sesuai dengan versi Flink yang digunakan oleh engine VVR target Anda. Misalnya, jika Anda memilih
vvr-8.0.9-flink-1.17pada halaman deployment, instalapache-flink==1.17.*.pip install apache-flink==1.17.2Instal IDE. Kami merekomendasikan PyCharm atau VS Code.
Selesaikan pengembangan pekerjaan Python secara offline, lalu terapkan dan jalankan pekerjaan tersebut di Realtime Compute Management Console.
Batasan
Karena keterbatasan deployment dan lingkungan jaringan, perhatikan batasan berikut saat mengembangkan pekerjaan Python:
Hanya Apache Flink 1.13 dan yang lebih baru yang didukung.
Ruang kerja Flink mencakup lingkungan Python yang telah dipra-instal dengan library umum seperti pandas, NumPy, dan PyArrow. Untuk detailnya, lihat daftar paket perangkat lunak yang telah dipra-instal di akhir topik ini.
Lingkungan runtime Flink hanya mendukung JDK 8 dan JDK 11. Jika pekerjaan Python Anda bergantung pada JAR pihak ketiga, pastikan kompatibilitasnya.
VVR 4.x hanya mendukung Scala 2.11. VVR 6.x dan yang lebih baru hanya mendukung Scala 2.12. Jika pekerjaan Python Anda bergantung pada JAR pihak ketiga, pastikan dependensi JAR tersebut sesuai dengan versi Scala yang tepat.
Pengembangan pekerjaan
Memilih antara Table API/SQL dan DataStream API
PyFlink mendukung Table API/SQL maupun DataStream API. Kami merekomendasikan menggunakan Table API/SQL terlebih dahulu karena alasan berikut:
Kinerja lebih baik: Rencana eksekusi Table API/SQL dioptimalkan dan dijalankan sepenuhnya dalam JVM. DataStream API memerlukan serialisasi dan deserialisasi setiap record antara proses JVM dan Python, yang menimbulkan overhead signifikan.
Fitur lebih lengkap: Table API/SQL menawarkan dukungan lebih lengkap untuk konektor, format data, dan fungsi jendela, serta berbagi ekosistem konektor yang sama dengan pekerjaan SQL.
Arah komunitas: Komunitas Apache Flink memfokuskan pengembangan PyFlink terutama pada Table API/SQL.
Gunakan DataStream API hanya untuk logika kustom kompleks yang tidak dapat diekspresikan dalam SQL.
Referensi pengembangan
Gunakan dokumen berikut untuk mengembangkan kode bisnis Flink secara lokal. Setelah pengembangan selesai, unggah kode Anda ke Konsol pengembangan Flink dan terapkan pekerjaan tersebut.
Untuk pengembangan kode bisnis Apache Flink 1.20, lihat Panduan Pengembangan Flink Python API.
Untuk masalah umum dan solusinya selama pengkodean Apache Flink, lihat FAQ.
Struktur proyek
Kami merekomendasikan struktur proyek berikut untuk pekerjaan Python:
my-flink-python-project/
├── my_job.py # File pekerjaan utama
├── udfs.py # Fungsi yang ditentukan pengguna (opsional)
├── requirements.txt # Dependensi Python pihak ketiga (opsional)
└── config.properties # File konfigurasi (opsional)Manajemen dependensi
Untuk petunjuk penggunaan lingkungan virtual Python kustom, paket Python pihak ketiga, JAR, dan file data dalam pekerjaan Python, lihat Menggunakan dependensi Python.
Fungsi yang ditentukan pengguna (UDF)
Contoh berikut menunjukkan cara mengembangkan UDSF Python yang mendesensitisasi string:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""Menyamarkan nomor telepon: pertahankan 3 digit pertama dan 4 digit terakhir, ganti bagian tengah dengan ****"""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]Gunakan UDF ini dalam pekerjaan SQL sebagai berikut:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;Untuk petunjuk mendaftarkan, memperbarui, dan menghapus UDF, lihat Mengelola fungsi yang ditentukan pengguna (UDF).
Menggunakan konektor
Untuk daftar konektor yang didukung, lihat Konektor yang didukung. Untuk menggunakan konektor, ikuti langkah-langkah berikut:
Masuk ke Konsol Realtime Compute.
Klik Console di kolom Actions ruang kerja target.
Di panel navigasi kiri, klik File Management.
Klik Upload Artifact dan pilih paket Python untuk konektor target Anda.
Anda dapat mengunggah konektor kustom atau konektor Flink bawaan. Untuk tautan unduhan paket Python konektor resmi Flink, lihat Daftar konektor.
Pada halaman , klik . Di kolom Additional Dependencies, pilih paket konektor Anda, konfigurasikan parameter lainnya, lalu terapkan pekerjaan tersebut.
Klik nama pekerjaan yang telah diterapkan. Pada tab Deployment Details, di bagian Runtime Parameters, klik Edit. Di kolom Other Configuration, tambahkan path ke paket konektor Python Anda.
Jika pekerjaan Anda bergantung pada beberapa paket konektor—misalnya, connector-1.jar dan connector-2.jar—konfigurasikan sebagai berikut:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'Untuk menggunakan konektor bawaan, format data, dan katalog (hanya untuk VVR 11.2 atau yang lebih baru), tambahkan konfigurasi berikut di kolom Other Configuration pada bagian Runtime Parameters:
pipeline.used-builtin-connectors: kafka;sls pipeline.used-builtin-formats: avro;parquet
Debugging Tugas
Dalam implementasi fungsi yang ditentukan pengguna Python Anda, gunakan modul logging untuk menghasilkan informasi log guna troubleshooting. Contohnya:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jSetelah logging, lihat outputnya di file log TaskManager.
Debugging lokal
Secara default, Realtime Compute for Apache Flink tidak dapat mengakses jaringan publik, sehingga kode Anda mungkin tidak dapat langsung terhubung ke sumber data online untuk pengujian. Gunakan salah satu pendekatan berikut untuk debugging lokal:
Unit test: Uji fungsi yang ditentukan pengguna (UDF) secara terpisah untuk memverifikasi logika.
Eksekusi lokal: Simulasikan input menggunakan sumber data lokal seperti file atau data in-memory, lalu jalankan pekerjaan secara lokal untuk memvalidasi logika pemrosesan. Contohnya:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # Gunakan sumber data lokal untuk pengujian ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")Debugging jarak jauh: Untuk debugging dengan sumber data online, lihat Menjalankan dan mendebug pekerjaan berbasis konektor secara lokal.
Deployment pekerjaan
Setelah mengembangkan pekerjaan Python, unggah ke Konsol Realtime Compute untuk deployment. Ikuti langkah-langkah berikut:
Masuk ke Konsol Realtime Compute dan buka ruang kerja target Anda.
Di panel navigasi kiri, klik File Management. Unggah file pekerjaan Python Anda (.py atau .zip), beserta dependensi pihak ketiga atau file konfigurasi apa pun.
Pada halaman , klik dan masukkan informasi deployment.
Parameter
Deskripsi
Python file path
Pilih file pekerjaan Python yang telah diunggah.
Entry Module
Biarkan kosong jika file pekerjaan Anda berupa file .py. Jika berupa file .zip, masukkan nama modul entri, misalnya
my_job.Additional Dependencies
Pilih JAR konektor atau file konfigurasi apa pun di sini.
Python Libraries
Pilih paket Python pihak ketiga (.whl atau .zip) di sini.
Python Archives
Pilih lingkungan virtual Python kustom (.zip) di sini.
Klik Deploy.
Untuk detail parameter deployment lebih lanjut, lihat Menerapkan pekerjaan.
Kode contoh lengkap
Contoh ini menunjukkan pekerjaan streaming Python yang membaca dari Kafka, memproses data, dan menulis ke MySQL. Gunakan hanya sebagai referensi.
Contoh ini tidak mencakup konfigurasi checkpointing atau kebijakan restart. Anda dapat menyesuaikan pengaturan ini setelah deployment di halaman Deployment Details. Untuk detailnya, lihat Mengonfigurasi informasi deployment pekerjaan.
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# Buat lingkungan eksekusi
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Buat tabel sumber Kafka
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# Buat tabel sink MySQL
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# Filter catatan dengan skor >= 60 dan tulis ke MySQL
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()Daftar paket perangkat lunak yang telah dipra-instal
VVR-11
Paket perangkat lunak berikut diinstal di ruang kerja Flink.
Software package | Version |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
Paket perangkat lunak berikut diinstal di ruang kerja Flink.
Software package | Version |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
Paket perangkat lunak berikut diinstal di ruang kerja Flink.
Software package | Version |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
Referensi
Untuk panduan lengkap proses pengembangan pekerjaan Python Flink, lihat Panduan Cepat untuk Pekerjaan Python Flink.
Untuk detail penggunaan lingkungan virtual Python kustom, paket Python pihak ketiga, JAR, dan file data dalam pekerjaan Python Flink, lihat Menggunakan dependensi Python.
Realtime Compute for Apache Flink juga mendukung pekerjaan SQL dan DataStream. Untuk panduan pengembangan, lihat Peta pengembangan pekerjaan dan Mengembangkan pekerjaan JAR.