All Products
Search
Document Center

Realtime Compute for Apache Flink:Python

Last Updated:Mar 10, 2026

Realtime Compute for Apache Flink memungkinkan Anda menggunakan user-defined function (UDF) Python dalam pekerjaan Flink SQL. Topik ini menjelaskan jenis-jenis UDF Python, cara menggunakan dependensi Python, serta cara menyetel kinerjanya.

Jenis user-defined function

Type

Description

User-defined scalar function (UDSF)

UDSF memetakan nol, satu, atau beberapa nilai skalar menjadi nilai skalar baru. Fungsi ini memiliki hubungan satu-ke-satu antara input dan output, membaca satu baris data, dan mengembalikan satu nilai output. Untuk informasi selengkapnya, lihat User-defined scalar functions (UDSFs).

User-defined aggregate function (UDAF)

UDAF menggabungkan beberapa catatan menjadi satu catatan. Fungsi ini memiliki hubungan banyak-ke-satu antara input dan output, mengagregasi beberapa catatan input menjadi satu nilai output. Untuk informasi selengkapnya, lihat User-defined aggregate functions (UDAFs).

User-defined table-valued function (UDTF)

UDTF menerima nol, satu, atau beberapa nilai skalar sebagai parameter input yang dapat memiliki panjang variabel. Fungsi ini mirip dengan UDSF, tetapi dapat mengembalikan sejumlah baris apa pun sebagai output, bukan hanya satu nilai. Baris yang dikembalikan dapat terdiri dari satu atau beberapa kolom. Satu pemanggilan fungsi dapat menghasilkan beberapa baris atau kolom. Untuk informasi selengkapnya, lihat User-defined table-valued functions (UDTFs).

Gunakan dependensi Python

Kluster Realtime Compute for Apache Flink telah dilengkapi paket-paket Python umum seperti Pandas, NumPy, dan PyArrow secara pra-instal. Untuk daftar lengkap paket Python pihak ketiga yang tersedia di Realtime Compute for Apache Flink, lihat Develop a Python job. Untuk menggunakan paket Python yang telah dipra-instal, Anda dapat mengimpornya ke dalam fungsi Python Anda. Contoh berikut menunjukkan cara melakukannya.

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

Anda juga dapat menggunakan paket Python pihak ketiga lainnya dalam UDF Python Anda. Jika paket tersebut tidak tersedia secara pra-instal, Anda harus mengunggahnya sebagai file dependensi saat mendaftarkan UDF Python tersebut. Untuk informasi selengkapnya, lihat Manage user-defined functions (UDFs) dan Use Python dependencies.

Debug kode

Anda dapat menerapkan logging dalam UDF Python Anda untuk menghasilkan informasi yang membantu mengidentifikasi masalah. Contoh berikut menunjukkan cara melakukannya.

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

Log yang dihasilkan dapat dilihat dalam file log TaskManager. Untuk informasi selengkapnya, lihat View operational logs.

Penyetelan kinerja

Pre-load resources

Pre-loading resources memungkinkan Anda memuatnya selama inisialisasi UDF, sehingga menghindari pemuatan ulang setiap kali metode `eval` dieksekusi. Sebagai contoh, Anda dapat memuat model pembelajaran mendalam yang besar hanya sekali, lalu menjalankan prediksi batch berulang kali di atasnya. Kode berikut memberikan contohnya.

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
Catatan

Untuk informasi tentang cara mengunggah file dependensi Python, lihat Use Python dependencies.

Gunakan library Pandas

Selain UDF Python biasa, Realtime Compute for Apache Flink juga mendukung Pandas UDF. Tipe data input untuk Pandas UDF adalah struktur data yang didefinisikan dalam Pandas, seperti `pandas.Series` dan `pandas.DataFrame`. Anda dapat memanfaatkan library Python berkinerja tinggi seperti Pandas dan NumPy dalam Pandas UDF untuk menghasilkan UDF Python berkinerja tinggi. Untuk informasi selengkapnya, lihat Vectorized User-defined Functions.

Konfigurasikan parameter

Kinerja UDF Python sangat bergantung pada implementasinya. Jika Anda mengalami masalah kinerja, optimalkan implementasi UDF tersebut. Selain itu, kinerja UDF Python juga dipengaruhi oleh nilai parameter berikut.

Parameter

Description

python.fn-execution.bundle.size

UDF Python dihitung secara asinkron. Selama eksekusi, operator Java mengirim data ke proses Python untuk diproses secara asinkron. Sebelum mengirim data, operator Java menyimpannya dalam cache. Ketika cache mencapai ambang batas tertentu, data dikirim ke proses Python. Parameter `python.fn-execution.bundle.size` mengontrol jumlah maksimum catatan data yang dapat disimpan dalam cache.

Nilai default-nya adalah 100000 catatan.

python.fn-execution.bundle.time

Parameter ini mengontrol waktu cache maksimum untuk data. Pemrosesan data yang di-cache dipicu ketika jumlah catatan yang di-cache mencapai ambang batas yang ditentukan oleh `python.fn-execution.bundle.size` atau waktu cache mencapai ambang batas yang ditentukan oleh `python.fn-execution.bundle.time`.

Nilai default-nya adalah 1000 milidetik.

python.fn-execution.arrow.batch.size

Saat Anda menggunakan Pandas UDF, parameter ini menentukan jumlah maksimum catatan data yang dapat dimuat dalam satu batch Arrow. Nilai default-nya adalah 10000.

Catatan

Nilai parameter `python.fn-execution.arrow.batch.size` tidak boleh lebih besar daripada nilai parameter `python.fn-execution.bundle.size`.

Catatan

Mengatur ketiga parameter ini ke nilai sebesar mungkin tidak selalu merupakan pendekatan terbaik. Jika parameter-parameter ini diatur terlalu besar, terlalu banyak data mungkin perlu diproses selama checkpointing, yang dapat memperpanjang durasi checkpoint atau bahkan menyebabkan kegagalan checkpoint. Untuk informasi lebih lanjut tentang parameter-parameter ini, lihat Configuration.

Referensi