Realtime Compute for Apache Flink memungkinkan Anda menggunakan user-defined function (UDF) Python dalam pekerjaan Flink SQL. Topik ini menjelaskan jenis-jenis UDF Python, cara menggunakan dependensi Python, serta cara menyetel kinerjanya.
Jenis user-defined function
Type | Description |
User-defined scalar function (UDSF) | UDSF memetakan nol, satu, atau beberapa nilai skalar menjadi nilai skalar baru. Fungsi ini memiliki hubungan satu-ke-satu antara input dan output, membaca satu baris data, dan mengembalikan satu nilai output. Untuk informasi selengkapnya, lihat User-defined scalar functions (UDSFs). |
User-defined aggregate function (UDAF) | UDAF menggabungkan beberapa catatan menjadi satu catatan. Fungsi ini memiliki hubungan banyak-ke-satu antara input dan output, mengagregasi beberapa catatan input menjadi satu nilai output. Untuk informasi selengkapnya, lihat User-defined aggregate functions (UDAFs). |
User-defined table-valued function (UDTF) | UDTF menerima nol, satu, atau beberapa nilai skalar sebagai parameter input yang dapat memiliki panjang variabel. Fungsi ini mirip dengan UDSF, tetapi dapat mengembalikan sejumlah baris apa pun sebagai output, bukan hanya satu nilai. Baris yang dikembalikan dapat terdiri dari satu atau beberapa kolom. Satu pemanggilan fungsi dapat menghasilkan beberapa baris atau kolom. Untuk informasi selengkapnya, lihat User-defined table-valued functions (UDTFs). |
Gunakan dependensi Python
Kluster Realtime Compute for Apache Flink telah dilengkapi paket-paket Python umum seperti Pandas, NumPy, dan PyArrow secara pra-instal. Untuk daftar lengkap paket Python pihak ketiga yang tersedia di Realtime Compute for Apache Flink, lihat Develop a Python job. Untuk menggunakan paket Python yang telah dipra-instal, Anda dapat mengimpornya ke dalam fungsi Python Anda. Contoh berikut menunjukkan cara melakukannya.
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)Anda juga dapat menggunakan paket Python pihak ketiga lainnya dalam UDF Python Anda. Jika paket tersebut tidak tersedia secara pra-instal, Anda harus mengunggahnya sebagai file dependensi saat mendaftarkan UDF Python tersebut. Untuk informasi selengkapnya, lihat Manage user-defined functions (UDFs) dan Use Python dependencies.
Debug kode
Anda dapat menerapkan logging dalam UDF Python Anda untuk menghasilkan informasi yang membantu mengidentifikasi masalah. Contoh berikut menunjukkan cara melakukannya.
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jLog yang dihasilkan dapat dilihat dalam file log TaskManager. Untuk informasi selengkapnya, lihat View operational logs.
Penyetelan kinerja
Pre-load resources
Pre-loading resources memungkinkan Anda memuatnya selama inisialisasi UDF, sehingga menghindari pemuatan ulang setiap kali metode `eval` dieksekusi. Sebagai contoh, Anda dapat memuat model pembelajaran mendalam yang besar hanya sekali, lalu menjalankan prediksi batch berulang kali di atasnya. Kode berikut memberikan contohnya.
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")Untuk informasi tentang cara mengunggah file dependensi Python, lihat Use Python dependencies.
Gunakan library Pandas
Selain UDF Python biasa, Realtime Compute for Apache Flink juga mendukung Pandas UDF. Tipe data input untuk Pandas UDF adalah struktur data yang didefinisikan dalam Pandas, seperti `pandas.Series` dan `pandas.DataFrame`. Anda dapat memanfaatkan library Python berkinerja tinggi seperti Pandas dan NumPy dalam Pandas UDF untuk menghasilkan UDF Python berkinerja tinggi. Untuk informasi selengkapnya, lihat Vectorized User-defined Functions.
Konfigurasikan parameter
Kinerja UDF Python sangat bergantung pada implementasinya. Jika Anda mengalami masalah kinerja, optimalkan implementasi UDF tersebut. Selain itu, kinerja UDF Python juga dipengaruhi oleh nilai parameter berikut.
Parameter | Description |
python.fn-execution.bundle.size | UDF Python dihitung secara asinkron. Selama eksekusi, operator Java mengirim data ke proses Python untuk diproses secara asinkron. Sebelum mengirim data, operator Java menyimpannya dalam cache. Ketika cache mencapai ambang batas tertentu, data dikirim ke proses Python. Parameter `python.fn-execution.bundle.size` mengontrol jumlah maksimum catatan data yang dapat disimpan dalam cache. Nilai default-nya adalah 100000 catatan. |
python.fn-execution.bundle.time | Parameter ini mengontrol waktu cache maksimum untuk data. Pemrosesan data yang di-cache dipicu ketika jumlah catatan yang di-cache mencapai ambang batas yang ditentukan oleh `python.fn-execution.bundle.size` atau waktu cache mencapai ambang batas yang ditentukan oleh `python.fn-execution.bundle.time`. Nilai default-nya adalah 1000 milidetik. |
python.fn-execution.arrow.batch.size | Saat Anda menggunakan Pandas UDF, parameter ini menentukan jumlah maksimum catatan data yang dapat dimuat dalam satu batch Arrow. Nilai default-nya adalah 10000. Catatan Nilai parameter `python.fn-execution.arrow.batch.size` tidak boleh lebih besar daripada nilai parameter `python.fn-execution.bundle.size`. |
Mengatur ketiga parameter ini ke nilai sebesar mungkin tidak selalu merupakan pendekatan terbaik. Jika parameter-parameter ini diatur terlalu besar, terlalu banyak data mungkin perlu diproses selama checkpointing, yang dapat memperpanjang durasi checkpoint atau bahkan menyebabkan kegagalan checkpoint. Untuk informasi lebih lanjut tentang parameter-parameter ini, lihat Configuration.
Referensi
Untuk informasi tentang cara mendaftarkan, memperbarui, dan menghapus UDF, lihat Mengelola user-defined functions (UDFs).
Untuk contoh cara mengembangkan dan menggunakan UDF Python, lihat Fungsi agregat yang ditentukan pengguna (UDAFs), Fungsi skalar yang ditentukan pengguna (UDSFs), dan Fungsi bernilai tabel yang ditentukan pengguna (UDTFs).
Untuk informasi tentang cara menggunakan lingkungan virtual Python kustom, paket Python pihak ketiga, paket JAR, dan file data dalam Pekerjaan Python Flink, lihat Menggunakan dependensi Python.
Untuk contoh cara mengembangkan dan menggunakan UDF Java, lihat User-defined aggregate functions (UDAFs), User-defined scalar functions (UDSFs), dan User-defined table-valued functions (UDTFs).
Untuk informasi tentang cara melakukan debug dan menyetel UDF Java, lihat Overview.