Topik ini menjelaskan cara menulis fungsi yang ditentukan pengguna (UDF) secara efisien menggunakan tipe data internal Flink, mengurangi overhead konversi tipe, menurunkan tekanan pengumpulan sampah, dan meningkatkan kinerja keseluruhan pekerjaan.
Latar belakang dan gambaran umum
Apache Flink menyediakan antarmuka SQL yang kuat, memungkinkan Anda memperluas fungsinya melalui UDF. Namun, pendekatan tradisional untuk implementasi UDF sering kali bergantung pada tipe primitif Java (seperti Map dan List). Hal ini memerlukan mesin Flink untuk sering mengonversi tipe Java ke struktur data internal Flink pada waktu proses, memengaruhi kinerja pekerjaan.
Sistem tipe data Flink
Tipe | Deskripsi |
Tipe data eksternal | Tipe Java yang berorientasi pada pengguna, seperti Map, List, dan String. |
Tipe data internal | Representasi biner, dioptimalkan oleh mesin Flink, seperti MapData, ArrayData, dan RowData. |
Selama eksekusi UDF, tipe data eksternal dikonversi ke tipe internal sebelum data diproses. Proses ini mengonsumsi sumber daya CPU dan memori tambahan.
Prinsip inti menulis UDF yang efisien
Gunakan Tipe Data Internal untuk Data Input dan Output.
Hindari menggunakan tipe Java dalam UDF. Sebagai gantinya, gunakan tipe data internal Flink untuk mengurangi overhead serialisasi dan deserialisasi.
Tentukan Logika Inferensi Tipe Kustom.
Ini memastikan bahwa tipe data input dan output adalah tipe data internal, memfasilitasi optimasi.
Hindari Membuat Objek Sementara.
Tipe data internal mendukung metode akses yang lebih efisien. Hindari membuat objek baru di dalam loop atau selama pemanggilan frekuensi tinggi.
Contoh
Deskripsi
Sebagai contoh, kami telah menulis UDF yang mengekstrak semua kunci dari bidang Map dan mengembalikannya sebagai array menggunakan tipe data internal:
Input | Output |
SELECT mapkey(MAP['A',1,'B',2]); | [A, B] |
SELECT mapkey(STR_TO_MAP('a=1,b=2')); | [a, b] |
Kode contoh
Java
package com.aliyun.example;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeInference.newBuilder;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.api.DataTypes;
import java.util.Optional;
import java.util.List;
public class MapKeyUDF extends ScalarFunction {
public ArrayData eval(MapData input) {
if (input == null) return null;
return input.keyArray();
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return newBuilder()
.inputTypeStrategy(MAP)
.outputTypeStrategy(nullableIfArgs(MAP_KEYS))
.build();
}
private static final InputTypeStrategy MAP = new InputTypeStrategy() {
@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}
@Override
public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
return Optional.of(
callContext.getArgumentDataTypes().stream()
.map(DataTypeUtils::toInternalDataType)
.collect(Collectors.toList()));
}
@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return null;
}
};
private static final TypeStrategy MAP_KEYS = callContext ->
Optional.of(
DataTypeUtils.toInternalDataType(DataTypes.ARRAY(
((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType()
))
);
private static final TypeStrategy nullableIfArgs(TypeStrategy strategy) {
return callContext -> strategy.infer(callContext).map(DataType::copy);
}
}Dependensi Maven
<!-- Flink Table Runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table Common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API Java Bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>Poin implementasi utama
Gunakan Tipe Data Internal sebagai Tipe Input dan Output.
Gunakan
MapDatauntuk input data alih-alih JavaMapuntuk menghindari konversi tipe.Kembalikan
ArrayDatauntuk output data alih-alih JavaList, lebih lanjut mengurangi overhead memori.
public ArrayData eval(MapData input) { if (input == null) { return null; } return input.keyArray(); }Strategi Tipe Input:
Pastikan parameter input dikonversi ke tipe data internal.
Batasi fungsi untuk hanya menerima satu parameter.
private static final InputTypeStrategy MAP = new InputTypeStrategy() { @Override public ArgumentCount getArgumentCount() { // Batasi fungsi untuk hanya menerima satu parameter. return ConstantArgumentCount.of(1); } @Override public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) { // Pastikan parameter input dikonversi ke tipe data internal. return Optional.of( callContext.getArgumentDataTypes().stream() .map(DataTypeUtils::toInternalDataType) .collect(Collectors.toList())); } @Override public List<Signature> getExpectedSignatures(FunctionDefinition definition) { // Tidak ada tanda tangan yang dikembalikan, umumnya terlihat secara internal return null; } };Strategi Tipe Output:
Tentukan secara eksplisit tipe output sebagai array, dengan tipe elemennya sesuai dengan tipe kunci dalam input
Map.Gunakan tipe data internal.
private static final TypeStrategy MAP_KEYS = callContext -> Optional.of( DataTypeUtils.toInternalDataType(DataTypes.ARRAY( ((KeyValueDataType) callContext.getArgumentDataTypes().get(0)).getKeyDataType() )) );
Keuntungan
UDF yang menggunakan tipe data internal menawarkan keuntungan performa berikut:
Mengurangi Overhead Konversi Tipe: Konversi sering antara tipe Java dan tipe data internal Flink dihindari.
Menurunkan Tekanan Pengumpulan Sampah: Lebih sedikit objek yang dibuat, menurunkan frekuensi pengumpulan sampah.
Meningkatkan Efisiensi Pemrosesan Data: Memanfaatkan optimasi khusus mesin Flink untuk tipe data internal.
Mengurangi Penggunaan Memori: Tipe data internal biasanya menggunakan lebih sedikit memori dibandingkan tipe Java.
Catatan penggunaan
Walaupun tipe data internal merupakan pendorong performa, pertimbangkan hal-hal berikut saat menulis UDF Anda:
Operasi Terbatas: Tipe data internal mendukung lebih sedikit operasi dibandingkan tipe Java.
Kurva Pembelajaran Curam: Diperlukan keakraban dengan API tipe data internal Flink.
Keterbacaan Kode: Kode kurang intuitif dibandingkan menggunakan tipe Java standar.
Kesulitan Debugging: Debugging tipe data internal bisa lebih kompleks.
Praktik terbaik
Ketika performa menjadi perhatian utama, seperti memproses sejumlah besar data, menggunakan UDF dengan tipe data internal Flink membantu Anda secara signifikan meningkatkan performa pekerjaan Anda. Namun, sebelum menulis UDF menggunakan tipe data internal, lakukan hal berikut:
Evaluasi Persyaratan Performa: Prioritaskan tipe data internal untuk UDF dengan persyaratan performa tinggi.
Akrab dengan API Tipe Internal: seperti
ArrayData,MapData, danRowData.Implementasikan Inferensi Tipe dengan Benar: Timpa
getTypeInferencedan tentukan tipe data input dan output.Uji dan Verifikasi: Bandingkan efek dari metode implementasi yang berbeda melalui pengujian performa.
Komentar Kode: Tambahkan komentar yang cukup saat menggunakan tipe data internal untuk pemeliharaan yang lebih baik.
Referensi
Jalur Utama:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataContoh Tipe Data Internal:
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/