全部产品
Search
文档中心

Realtime Compute for Apache Flink:Tulis Fungsi yang Ditentukan Pengguna secara Efisien

更新时间:Jul 09, 2025

Topik ini menjelaskan cara menulis fungsi yang ditentukan pengguna (UDF) secara efisien menggunakan tipe data internal Flink, mengurangi overhead konversi tipe, menurunkan tekanan pengumpulan sampah, dan meningkatkan kinerja keseluruhan pekerjaan.

Latar belakang dan gambaran umum

Apache Flink menyediakan antarmuka SQL yang kuat, memungkinkan Anda memperluas fungsinya melalui UDF. Namun, pendekatan tradisional untuk implementasi UDF sering kali bergantung pada tipe primitif Java (seperti Map dan List). Hal ini memerlukan mesin Flink untuk sering mengonversi tipe Java ke struktur data internal Flink pada waktu proses, memengaruhi kinerja pekerjaan.

Sistem tipe data Flink

Tipe

Deskripsi

Tipe data eksternal

Tipe Java yang berorientasi pada pengguna, seperti Map, List, dan String.

Tipe data internal

Representasi biner, dioptimalkan oleh mesin Flink, seperti MapData, ArrayData, dan RowData.

Selama eksekusi UDF, tipe data eksternal dikonversi ke tipe internal sebelum data diproses. Proses ini mengonsumsi sumber daya CPU dan memori tambahan.

Prinsip inti menulis UDF yang efisien

  • Gunakan Tipe Data Internal untuk Data Input dan Output.

    Hindari menggunakan tipe Java dalam UDF. Sebagai gantinya, gunakan tipe data internal Flink untuk mengurangi overhead serialisasi dan deserialisasi.

  • Tentukan Logika Inferensi Tipe Kustom.

    Ini memastikan bahwa tipe data input dan output adalah tipe data internal, memfasilitasi optimasi.

  • Hindari Membuat Objek Sementara.

    Tipe data internal mendukung metode akses yang lebih efisien. Hindari membuat objek baru di dalam loop atau selama pemanggilan frekuensi tinggi.

Contoh

Deskripsi

Sebagai contoh, kami telah menulis UDF yang mengekstrak semua kunci dari bidang Map dan mengembalikannya sebagai array menggunakan tipe data internal:

Input

Output

SELECT mapkey(MAP['A',1,'B',2]);

[A, B]

SELECT mapkey(STR_TO_MAP('a=1,b=2'));

[a, b]

Kode contoh

Java

package com.aliyun.example;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInference.newBuilder;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.api.DataTypes;

import java.util.Optional;
import java.util.List;

public class MapKeyUDF extends ScalarFunction {

    public ArrayData eval(MapData input) {
        if (input == null) return null;
        return input.keyArray();
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return newBuilder()
            .inputTypeStrategy(MAP)
            .outputTypeStrategy(nullableIfArgs(MAP_KEYS))
            .build();
    }

    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            return ConstantArgumentCount.of(1);
        }

        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            return Optional.of(
                callContext.getArgumentDataTypes().stream()
                    .map(DataTypeUtils::toInternalDataType)
                    .collect(Collectors.toList()));
        }

        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            return null;
        }
    };

    private static final TypeStrategy MAP_KEYS = callContext ->
        Optional.of(
            DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
            ))
        );

    private static final TypeStrategy nullableIfArgs(TypeStrategy strategy) {
        return callContext -> strategy.infer(callContext).map(DataType::copy);
    }
}

Dependensi Maven

    <!-- Flink Table Runtime -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table Common -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink Table API Java Bridge -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>

Poin implementasi utama

  • Gunakan Tipe Data Internal sebagai Tipe Input dan Output.

    • Gunakan MapData untuk input data alih-alih Java Map untuk menghindari konversi tipe.

    • Kembalikan ArrayData untuk output data alih-alih Java List, lebih lanjut mengurangi overhead memori.

    public ArrayData eval(MapData input) {
        if (input == null) {
            return null;
        }
        return input.keyArray();
    }
  • Strategi Tipe Input:

    • Pastikan parameter input dikonversi ke tipe data internal.

    • Batasi fungsi untuk hanya menerima satu parameter.

    private static final InputTypeStrategy MAP = new InputTypeStrategy() {
        @Override
        public ArgumentCount getArgumentCount() {
            // Batasi fungsi untuk hanya menerima satu parameter.
            return ConstantArgumentCount.of(1);
        }
    
        @Override
        public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
            // Pastikan parameter input dikonversi ke tipe data internal.
            return Optional.of(
                    callContext.getArgumentDataTypes().stream()
                            .map(DataTypeUtils::toInternalDataType)
                            .collect(Collectors.toList()));
        }
    
        @Override
        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
            // Tidak ada tanda tangan yang dikembalikan, umumnya terlihat secara internal
            return null;
        }
    };
  • Strategi Tipe Output:

    • Tentukan secara eksplisit tipe output sebagai array, dengan tipe elemennya sesuai dengan tipe kunci dalam input Map.

    • Gunakan tipe data internal.

        private static final TypeStrategy MAP_KEYS = callContext ->
            Optional.of(
                DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
                    ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
                ))
            );

Keuntungan

UDF yang menggunakan tipe data internal menawarkan keuntungan performa berikut:

  • Mengurangi Overhead Konversi Tipe: Konversi sering antara tipe Java dan tipe data internal Flink dihindari.

  • Menurunkan Tekanan Pengumpulan Sampah: Lebih sedikit objek yang dibuat, menurunkan frekuensi pengumpulan sampah.

  • Meningkatkan Efisiensi Pemrosesan Data: Memanfaatkan optimasi khusus mesin Flink untuk tipe data internal.

  • Mengurangi Penggunaan Memori: Tipe data internal biasanya menggunakan lebih sedikit memori dibandingkan tipe Java.

Catatan penggunaan

Walaupun tipe data internal merupakan pendorong performa, pertimbangkan hal-hal berikut saat menulis UDF Anda:

  • Operasi Terbatas: Tipe data internal mendukung lebih sedikit operasi dibandingkan tipe Java.

  • Kurva Pembelajaran Curam: Diperlukan keakraban dengan API tipe data internal Flink.

  • Keterbacaan Kode: Kode kurang intuitif dibandingkan menggunakan tipe Java standar.

  • Kesulitan Debugging: Debugging tipe data internal bisa lebih kompleks.

Praktik terbaik

Ketika performa menjadi perhatian utama, seperti memproses sejumlah besar data, menggunakan UDF dengan tipe data internal Flink membantu Anda secara signifikan meningkatkan performa pekerjaan Anda. Namun, sebelum menulis UDF menggunakan tipe data internal, lakukan hal berikut:

  • Evaluasi Persyaratan Performa: Prioritaskan tipe data internal untuk UDF dengan persyaratan performa tinggi.

  • Akrab dengan API Tipe Internal: seperti ArrayData, MapData, dan RowData.

  • Implementasikan Inferensi Tipe dengan Benar: Timpa getTypeInference dan tentukan tipe data input dan output.

  • Uji dan Verifikasi: Bandingkan efek dari metode implementasi yang berbeda melalui pengujian performa.

  • Komentar Kode: Tambahkan komentar yang cukup saat menggunakan tipe data internal untuk pemeliharaan yang lebih baik.

Referensi

  • JavaDoc

  • Kode Sumber Flink

    • Jalur Utama: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data

    • Contoh Tipe Data Internal: flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/

  • Kelola UDF