Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi agregat yang didefinisikan pengguna (UDAF) di Realtime Compute for Apache Flink.
Definisi
UDAF menggabungkan beberapa nilai menjadi satu nilai. Pemetaan banyak-ke-satu dibentuk antara input dan output dari UDAF. Beberapa nilai input digabungkan untuk menghasilkan satu nilai output.
Batasan
Layanan yang disediakan oleh Realtime Compute for Apache Flink bergantung pada lingkungan deployment dan jaringan. Oleh karena itu, saat mengembangkan Python UDF di Realtime Compute for Apache Flink, perhatikan batasan berikut:
Hanya mendukung Apache Flink 1.12 dan versi lebih baru.
Python sudah terinstal sebelumnya di workspace Realtime Compute for Apache Flink. Oleh karena itu, Anda harus mengembangkan kode dalam Python versi yang diminta.
nullPython 3.7.9 sudah terinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi lebih lama dari 8.0.11. Python 3.9.21 sudah terinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau lebih baru. Setelah meningkatkan versi VVR ke 8.0.11 atau lebih baru, Anda harus menguji, menerapkan, dan menjalankan draft PyFlink dari versi VVR sebelumnya lagi.
JDK 8 dan JDK 11 didukung di lingkungan runtime Realtime Compute for Apache Flink. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan JDK 8 atau JDK 11.
Hanya mendukung Scala 2.11 open source. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan Scala 2.11.
Membuat UDAF
Flink menyediakan contoh kode ekstensi yang didefinisikan pengguna (UDX) dalam Python untuk membantu Anda mengembangkan UDX. Contoh kode mencakup implementasi Python UDF, Python user-defined aggregate functions (UDAF), dan Python UDTF. Topik ini menjelaskan cara membuat UDAF di sistem operasi Windows.
Unduh dan ekstrak paket python_demo-master ke mesin lokal Anda.
Di bilah menu utama PyCharm, pilih untuk membuka paket python_demo-master yang telah diekstrak.
Klik dua kali file udfs.py di direktori \python_demo-master\udx. Kemudian, ubah isi file sesuai dengan kebutuhan bisnis Anda.
Dalam contoh ini, weighted_avg mendefinisikan kode untuk rata-rata tertimbang dari data saat ini dan data historis.
from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(sum, count) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1] def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))Pergi ke direktori \python_demo-master tempat folder udx berada dan jalankan perintah berikut untuk mengemas file di direktori tersebut:
zip -r python_demo.zip udxJika paket python_demo.zip muncul di direktori \python_demo-master\, UDAF berhasil dibuat.
Mendaftarkan UDAF
Untuk informasi lebih lanjut tentang cara mendaftarkan UDAF, lihat Kelola UDF.
Menggunakan UDAF
Setelah UDAF didaftarkan, Anda dapat melakukan langkah-langkah berikut untuk menggunakan UDAF:
Gunakan Flink SQL untuk membuat draft. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.
Dapatkan nilai dari kolom a di tabel ASI_UDAF_Source dengan kolom b sebagai bobot. Contoh kode:
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( avg_value DOUBLE ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDAF_Sink SELECT weighted_avg(a, b) FROM ASI_UDAF_Source;Di panel navigasi kiri konsol pengembangan Realtime Compute for Apache Flink, pilih . Di halaman Deployments, temukan deployment yang diinginkan dan klik Start di kolom Actions.
Setelah deployment dimulai, rata-rata dari data saat ini dan data historis kolom a di tabel ASI_UDAF_Source akan dimasukkan ke setiap baris di tabel ASI_UDAF_Sink dengan kolom b sebagai bobot.