Realtime Compute for Apache Flink mendukung penggunaan fungsi yang ditentukan pengguna (UDF) Python dalam penyebaran SQL Flink. Topik ini menjelaskan klasifikasi dan metode optimalisasi Python UDF serta cara menggunakan dependensi Python.
Klasifikasi UDF
Klasifikasi UDF | Deskripsi |
Fungsi skalar yang ditentukan pengguna (UDSF) | UDSF memetakan nol, satu, atau lebih nilai skalar ke nilai skalar baru. Pemetaan satu-satu dibuat antara input dan output UDSF. Setiap kali UDSF membaca satu baris data, UDSF menulis satu nilai output. Untuk informasi lebih lanjut, lihat UDSFs. |
Fungsi agregat yang ditentukan pengguna (UDAF) | UDAF menggabungkan beberapa nilai menjadi satu nilai. Pemetaan banyak-ke-satu dibuat antara input dan output UDAF. Beberapa catatan input digabungkan untuk menghasilkan satu nilai output. Untuk informasi lebih lanjut, lihat UDAFs. |
Fungsi bernilai tabel yang ditentukan pengguna (UDTF) | UDTF mengambil nol, satu, atau lebih nilai skalar sebagai parameter input. Parameter ini dapat berupa parameter panjang variabel. UDTF mirip dengan UDSF kecuali bahwa UDTF dapat mengembalikan sejumlah baris daripada satu nilai tunggal. Baris yang dikembalikan terdiri dari satu atau lebih kolom. Beberapa baris atau kolom dikembalikan setiap kali UDTF dipanggil. Untuk informasi lebih lanjut, lihat UDTFs. |
Menggunakan dependensi Python
Paket Python umum seperti pandas, NumPy, dan PyArrow telah diinstal sebelumnya di ruang kerja Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang paket pihak ketiga Python yang diinstal sebelumnya di ruang kerja Realtime Compute for Apache Flink, lihat Mengembangkan Draf API Python. Paket Python yang diinstal sebelumnya harus diimpor dalam Python UDF. Contoh kode:
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)Anda juga dapat menggunakan jenis lain dari paket Python pihak ketiga dalam Python UDF. Jika Anda menggunakan paket Python pihak ketiga yang tidak diinstal sebelumnya di Flink yang sepenuhnya dikelola, Anda harus mengunggah paket tersebut sebagai file dependensi ke Flink yang sepenuhnya dikelola. Untuk informasi lebih lanjut, lihat Kelola UDF dan Gunakan Dependensi Python.
Debugging kode
Dalam implementasi kode Python UDF, Anda dapat menggunakan metode logging untuk menghasilkan log dan menemukan kesalahan berdasarkan log tersebut. Berikut adalah contoh kode:
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jSetelah log dihasilkan, Anda dapat melihat log tersebut di file log Pengelola Tugas. Untuk informasi lebih lanjut, lihat Lihat Log Operasional Penyebaran.
Optimalisasi kinerja
Gunakan fitur pra-muat sumber daya
Jika Anda menggunakan fitur pra-muat sumber daya, sumber daya dapat dimuat terlebih dahulu selama inisialisasi UDF. Dengan cara ini, sumber daya tidak perlu dimuat ulang setiap kali metode eval() digunakan untuk menghitung data. Sebagai contoh, jika Anda hanya ingin memuat model pembelajaran mendalam besar sekali dan kemudian membuat prediksi pada model tersebut berkali-kali, Anda dapat menggunakan kode berikut:
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")Untuk informasi lebih lanjut tentang cara mengunggah file Python, lihat Gunakan Dependensi Python.
Gunakan pustaka pandas
Realtime Compute for Apache Flink mendukung penggunaan UDF pandas selain UDF Python yang umum digunakan. Tipe data input UDF pandas menggunakan struktur data yang didefinisikan dalam pandas, seperti pandas.Series dan pandas.DataFrame. Anda dapat menggunakan pustaka Python berperforma tinggi seperti pandas dan NumPy dalam UDF pandas untuk mengembangkan UDF Python berperforma tinggi. Untuk informasi lebih lanjut, lihat Fungsi Pengguna Vektorisasi.
Konfigurasikan parameter yang diperlukan
Kinerja Python UDF terutama bergantung pada implementasinya. Jika Anda mengalami masalah kinerja terkait Python UDF, Anda perlu mengoptimalkan implementasinya. Kinerja Python UDF juga dipengaruhi oleh nilai parameter yang dijelaskan dalam tabel berikut.
Parameter | Deskripsi |
python.fn-execution.bundle.size | Python UDF dihitung secara asinkron. Selama perhitungan, operator Java secara asinkron mengirimkan data ke proses Python untuk diproses. Operator Java menyimpan data dalam cache dan kemudian mengirimkan data ke proses Python ketika jumlah catatan data dalam cache mencapai ambang tertentu. Jumlah maksimum catatan data yang dapat disimpan dalam cache. Nilai default: 100000. |
python.fn-execution.bundle.time | Durasi maksimum untuk penyimpanan data dalam cache. Jika jumlah data yang disimpan dalam cache mencapai nilai yang ditentukan oleh parameter python.fn-execution.bundle.size atau durasi penyimpanan data dalam cache mencapai nilai yang ditentukan oleh parameter python.fn-execution.bundle.time, perhitungan pada data yang disimpan dalam cache akan dipicu. Nilai default: 1000. Unit: milidetik. |
python.fn-execution.arrow.batch.size | Jumlah maksimum catatan data yang diizinkan dalam batch panah saat Anda menggunakan UDF pandas. Nilai default: 10000. Catatan Nilai parameter python.fn-execution.arrow.batch.size tidak boleh lebih besar dari nilai parameter python.fn-execution.bundle.size. |
Jika nilai parameter ini terlalu besar, data yang berlebihan perlu diproses selama checkpointing. Ini memperpanjang checkpointing atau bahkan menyebabkan kegagalan checkpointing. Untuk informasi lebih lanjut tentang parameter ini, lihat Konfigurasi.
Referensi
Untuk informasi lebih lanjut tentang cara mendaftarkan, memperbarui, dan menghapus UDF, lihat Kelola UDF.
Untuk informasi lebih lanjut tentang cara mengembangkan dan menggunakan Python UDF, lihat UDAFs, UDSFs, dan UDTFs.
Untuk informasi lebih lanjut tentang cara menggunakan lingkungan virtual Python kustom, paket Python pihak ketiga, paket JAR, dan file data dalam penyebaran Python Flink, lihat Gunakan Dependensi Python.
Untuk informasi lebih lanjut tentang cara mengembangkan dan menggunakan Java UDF, lihat UDAFs, UDSFs, dan UDTFs.
Untuk informasi lebih lanjut tentang cara debug dan optimalkan Java UDF, lihat Ikhtisar.
Untuk informasi lebih lanjut tentang cara menggunakan UDAF untuk mengurutkan dan menggabungkan data, lihat Gunakan UDAF untuk Mengurutkan dan Menggabungkan Data.