Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi skalar yang didefinisikan pengguna (UDSF) dalam Python.
Definisi
Sebuah UDSF memetakan nol, satu, atau lebih nilai skalar ke nilai skalar baru. Data input dan output dari sebuah UDSF memiliki hubungan satu-satu. Setiap kali UDSF membaca baris data, ia menulis nilai keluaran.
Batasan
Layanan yang disediakan oleh Realtime Compute for Apache Flink bergantung pada lingkungan deployment dan jaringan. Oleh karena itu, saat mengembangkan Python UDF di Realtime Compute for Apache Flink, perhatikan batasan berikut:
Hanya Apache Flink 1.12 dan versi lebih baru yang didukung.
Python sudah diinstal sebelumnya di workspace Realtime Compute for Apache Flink. Oleh karena itu, Anda harus mengembangkan kode dalam versi Python yang diminta.
nullPython 3.7.9 sudah diinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi lebih lama dari 8.0.11. Python 3.9.21 sudah diinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau lebih baru. Setelah Anda meningkatkan versi VVR ke 8.0.11 atau lebih baru, Anda harus menguji, menerapkan, dan menjalankan draft PyFlink dari versi VVR sebelumnya lagi.
JDK 8 dan JDK 11 didukung dalam lingkungan runtime Realtime Compute for Apache Flink. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan JDK 8 atau JDK 11.
Hanya Scala 2.11 open source yang didukung. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan Scala 2.11.
Membuat UDSF
Flink menyediakan contoh kode fungsi yang didefinisikan pengguna (UDF) dalam Python untuk membantu Anda mengembangkan UDF. Contoh mencakup implementasi Python UDSF, fungsi agregat yang didefinisikan pengguna (UDAF), dan fungsi bernilai tabel yang didefinisikan pengguna (UDTF). Bagian ini menjelaskan cara membuat UDSF di sistem operasi Windows.
Unduh dan ekstrak paket python_demo-master ke mesin lokal Anda.
nullpython_demo-master disediakan di situs web pihak ketiga. Saat mengakses situs web tersebut, akses mungkin gagal atau tertunda.
Di bilah menu utama PyCharm, pilih untuk membuka paket python_demo-master yang telah diekstrak.
Klik dua kali file udfs.py di jalur \python_demo-master\udx. Kemudian, ubah isi file sesuai kebutuhan bisnis Anda.
Dalam contoh ini, sub_string mendefinisikan kode untuk mendapatkan karakter dari posisi awal hingga posisi akhir di setiap rekaman data.
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]Pergi ke jalur \python_demo-master tempat folder udx berada dan jalankan perintah berikut untuk mengemas file di direktori tersebut:
zip -r python_demo.zip udxJika paket python_demo.zip muncul di jalur \python_demo-master\, maka Python UDSF telah berhasil dibuat.
Mendaftarkan UDSF
Untuk informasi lebih lanjut tentang cara mendaftarkan UDSF, lihat Kelola UDF.
Menggunakan UDSF
Setelah mendaftarkan UDSF, Anda dapat mulai menggunakannya. Untuk menggunakan UDSF, ikuti langkah-langkah berikut:
Gunakan Flink SQL untuk membuat draft. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.
Kode berikut memberikan contoh cara mendapatkan karakter dari karakter kedua hingga keempat dari string di setiap baris bidang a dalam tabel ASI_UDSF_Source:
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;Di panel navigasi sisi kiri konsol pengembangan Realtime Compute for Apache Flink, pilih . Di halaman Deployments, temukan deployment yang diinginkan dan klik Start di kolom Actions.
Setelah deployment dimulai, karakter kedua hingga keempat dari string di setiap baris bidang a dalam tabel ASI_UDSF_Source akan dimasukkan ke setiap baris tabel ASI_UDSF_Sink.