全部产品
Search
文档中心

Realtime Compute for Apache Flink:UDTFs

更新时间:Jun 19, 2025

Topik ini menjelaskan cara membuat, mendaftarkan, dan menggunakan fungsi tabel bernilai yang didefinisikan pengguna (UDTF) Python di Realtime Compute for Apache Flink.

Deskripsi

UDTF mengambil nol, satu, atau lebih nilai skalar sebagai parameter input. Parameter-parameter ini dapat berupa parameter dengan panjang variabel. UDTF dapat mengembalikan sejumlah baris alih-alih satu nilai tunggal. Baris yang dikembalikan dapat mencakup satu atau lebih kolom. Beberapa baris atau kolom dikembalikan setiap kali UDTF dipanggil. UDTF mirip dengan fungsi skalar yang didefinisikan pengguna (UDF), tetapi berbeda dalam hasil yang dikembalikan.

Batasan

Layanan yang disediakan oleh Realtime Compute for Apache Flink bergantung pada lingkungan deployment dan jaringan. Oleh karena itu, saat mengembangkan Python UDF di Realtime Compute for Apache Flink, perhatikan batasan-batasan berikut:

  • Hanya Apache Flink 1.12 dan versi lebih baru yang didukung.

  • Python sudah diinstal sebelumnya di ruang kerja Realtime Compute for Apache Flink. Oleh karena itu, Anda harus mengembangkan kode dalam Python versi yang diperlukan.

    null

    Python 3.7.9 sudah diinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi lebih lama dari 8.0.11. Python 3.9.21 sudah diinstal sebelumnya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau lebih baru. Setelah memperbarui versi VVR ke 8.0.11 atau lebih baru, Anda harus menguji, menerapkan, dan menjalankan draft PyFlink dari versi VVR sebelumnya lagi.

  • JDK 8 dan JDK 11 didukung di lingkungan runtime Realtime Compute for Apache Flink. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan JDK 8 atau JDK 11.

  • Hanya Scala 2.11 open source yang didukung. Jika deployment Python Anda bergantung pada paket JAR pihak ketiga, pastikan bahwa paket JAR tersebut kompatibel dengan Scala 2.11.

Buat UDTF

null

Flink menyediakan contoh kode ekstensi yang didefinisikan pengguna (UDX) Python untuk membantu Anda mengembangkan UDX. Contoh kode mencakup implementasi Python UDF, fungsi agregat yang didefinisikan pengguna Python (UDAF), dan Python UDTF. Bagian ini menjelaskan cara membuat UDTF di sistem operasi Windows.

  1. Unduh dan ekstrak paket python_demo-master ke mesin lokal Anda.

  2. Di bilah menu utama PyCharm, pilih File > Open untuk membuka paket python_demo-master yang telah diekstrak.

  3. Klik dua kali file udtfs.py di direktori \python_demo-master\udx. Kemudian, modifikasi isi file sesuai dengan kebutuhan bisnis Anda.

    Dalam contoh ini, split mendefinisikan kode yang dapat memisahkan satu baris string menjadi beberapa kolom string dengan tanda vertikal (|).

    from pyflink.table import DataTypes
    from pyflink.table.udf import udtf
    
    @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
    def split(s: str):
        splits = s.split("|")
        yield splits[0], splits[1]
  4. Pergi ke direktori \python_demo tempat folder udx berada dan jalankan perintah berikut untuk mengemas file di direktori tersebut:

    zip -r python_demo.zip udx

    Jika paket python_demo.zip muncul di direktori \python_demo\, UDTF telah dikembangkan.

Daftarkan UDTF

Untuk informasi lebih lanjut tentang cara mendaftarkan UDTF, lihat Kelola UDF.

Gunakan UDTF

Setelah mendaftarkan UDTF, Anda dapat melakukan langkah-langkah berikut untuk menggunakannya:

  1. Gunakan Flink SQL untuk membuat draft. Untuk informasi lebih lanjut, lihat Kembangkan draft SQL.

    Setelah string aa dan bidang pesan di setiap baris string dalam tabel ASI_UDTF_Source digabungkan dengan tanda vertikal (|), string gabungan tersebut dipisahkan menjadi beberapa kolom string oleh tanda vertikal (|). Kode berikut menunjukkan contohnya:

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(split(concat_ws('|', `message`, 'aa'))) as T(name,place);
  2. Di panel navigasi sisi kiri konsol pengembangan Realtime Compute for Apache Flink, pilih O&M > Deployments. Di halaman Deployments, temukan deployment yang diinginkan dan klik Start di kolom Actions.

    Setelah deployment dimulai, dua kolom data dimasukkan ke dalam tabel ASI_UDTF_Sink. Dua kolom data tersebut berisi string gabungan yang dipisahkan oleh tanda vertikal (|).