API DataStream Flink menyediakan model pemrograman fleksibel yang memungkinkan Anda menentukan transformasi data, operasi, dan operator kustom. Fleksibilitas ini sangat ideal untuk logika bisnis kompleks dan kebutuhan pemrosesan data. Topik ini menjelaskan cara mengembangkan pekerjaan JAR Flink.
Dukungan untuk Apache Flink open source
API DataStream di Realtime Compute for Apache Flink sepenuhnya kompatibel dengan versi Apache Flink open source. Untuk informasi selengkapnya, lihat Introduction to Apache Flink dan Flink DataStream API Developer Guide.
Persyaratan lingkungan pengembangan
Lingkungan pengembangan terintegrasi (IDE) seperti IntelliJ IDEA telah diinstal.
Maven 3.6.3 atau versi yang lebih baru telah diinstal.
Pengembangan pekerjaan hanya mendukung JDK 8 dan JDK 11.
Anda dapat mengembangkan pekerjaan JAR secara offline, lalu menerapkan dan menjalankannya di Konsol Realtime Compute for Apache Flink.
Prasyarat
Contoh ini menunjukkan cara menggunakan konektor sumber data. Anda harus menyiapkan sumber data yang diperlukan terlebih dahulu.
Contoh ini menggunakan Alibaba Cloud Message Queue for Kafka 2.6.2 dan ApsaraDB RDS for MySQL 8.0 sebagai sumber data.
Untuk informasi tentang cara mengakses sumber data yang dikelola sendiri melalui internet atau lintas VPC, lihat Pilih metode koneksi jaringan.
Jika Anda tidak memiliki sumber data Message Queue for Kafka, Anda harus membeli dan menerapkan instans Kafka. Untuk informasi selengkapnya, lihat Langkah 2: Membeli dan menerapkan instans. Saat menerapkan instans tersebut, pastikan instans berada dalam VPC yang sama dengan ruang kerja Realtime Compute for Apache Flink Anda.
Jika Anda tidak memiliki sumber data ApsaraDB RDS for MySQL, Anda harus membeli instans RDS for MySQL. Untuk informasi selengkapnya, lihat Langkah 1: Membuat instans ApsaraDB RDS for MySQL dan mengonfigurasi database. Saat membeli instans tersebut, pastikan instans berada dalam wilayah dan VPC yang sama dengan ruang kerja Realtime Compute for Apache Flink Anda.
Mengembangkan pekerjaan
Konfigurasikan dependensi lingkungan Flink
Untuk menghindari konflik dependensi paket JAR, perhatikan hal berikut:
${flink.version}adalah versi Flink yang sesuai dengan runtime pekerjaan. Gunakan versi Flink yang sama dengan mesin Ververica Runtime (VVR) yang Anda pilih pada halaman penerapan. Misalnya, jika Anda memilih mesinvvr-8.0.9-flink-1.17pada halaman penerapan, versi Flink yang sesuai adalah1.17.2. Untuk melihat detail versi mesin VVR, lihat Bagaimana cara melihat versi Flink pekerjaan saat ini?.Untuk dependensi terkait Flink, atur cakupan (scope) ke `provided`. Untuk melakukannya, tambahkan
<scope>provided</scope>ke dependensi tersebut. Hal ini terutama berlaku untuk dependensi non-konektor yang diawali denganflink-di bawah gruporg.apache.flink.Dalam kode sumber Flink, hanya metode yang secara eksplisit dianotasi dengan @Public atau @PublicEvolving yang dianggap publik. Realtime Compute for Apache Flink hanya menjamin kompatibilitas untuk metode-metode tersebut.
Jika konektor bawaan di Realtime Compute for Apache Flink mendukung API DataStream, Anda harus menggunakan dependensi bawaannya.
Kode berikut menunjukkan beberapa dependensi dasar Flink. Anda mungkin juga perlu menambahkan dependensi untuk file log. Untuk daftar lengkap dependensi, lihat Kode contoh lengkap di akhir topik ini.
Dependensi terkait Flink
Dependensi dan penggunaan konektor
Untuk membaca dan menulis data menggunakan DataStream, Anda harus menggunakan konektor DataStream yang sesuai. Repositori pusat Maven berisi konektor DataStream VVR yang dapat Anda gunakan langsung selama pengembangan pekerjaan.
Anda harus menggunakan konektor yang secara eksplisit mendukung API DataStream dalam Konektor yang didukung. Jangan gunakan konektor lain, karena antarmuka dan parameternya mungkin diubah di masa mendatang.
Anda dapat menggunakan salah satu metode berikut untuk menggunakan konektor:
(Direkomendasikan) Unggah paket JAR konektor sebagai file dependensi tambahan
Dalam file POM Maven pekerjaan, tambahkan konektor yang diperlukan sebagai dependensi proyek dan atur cakupannya ke `provided`. Untuk file dependensi lengkap, lihat Kode contoh lengkap di akhir topik ini.
Catatan${vvr.version}adalah versi mesin runtime pekerjaan. Misalnya, jika pekerjaan Anda berjalan di mesinvvr-8.0.9-flink-1.17, versi Flink yang sesuai adalah1.17.2. Kami menyarankan Anda menggunakan versi mesin terbaru. Untuk informasi selengkapnya tentang versi mesin, lihat Mesin.Karena paket JAR konektor diimpor sebagai dependensi tambahan, Anda tidak perlu mengemasnya ke dalam file JAR pekerjaan. Oleh karena itu, Anda harus mengatur cakupannya ke
provided.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>Untuk mengembangkan konektor baru atau memperluas fitur konektor yang sudah ada, proyek juga memerlukan paket publik
flink-connector-baseatauververica-connector-common.<!-- Basic dependency for the Flink connector common interface --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency for the Alibaba Cloud connector common interface --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>Untuk informasi konfigurasi koneksi DataStream dan contoh kode, lihat dokumentasi untuk konektor DataStream yang sesuai.
Untuk daftar konektor yang dapat digunakan sebagai tipe DataStream, lihat Konektor yang didukung.
Saat menerapkan pekerjaan, tambahkan paket JAR konektor yang sesuai di bagian Additional Dependencies. Untuk informasi selengkapnya, lihat Deploy a JAR job. Anda dapat mengunggah konektor yang Anda kembangkan atau konektor yang disediakan oleh Realtime Compute for Apache Flink. Untuk tautan unduhan, lihat Daftar konektor. Gambar berikut menunjukkan contohnya.

Kemas konektor sebagai dependensi proyek ke dalam file JAR pekerjaan
Dalam file POM Maven pekerjaan, tambahkan konektor yang diperlukan sebagai dependensi proyek. Misalnya, Anda dapat mengimpor konektor Kafka dan konektor MySQL.
Catatan${vvr.version}adalah versi mesin runtime pekerjaan. Misalnya, jika pekerjaan Anda berjalan di mesinvvr-8.0.9-flink-1.17, versi Flink yang sesuai adalah1.17.2. Kami menyarankan Anda menggunakan versi mesin terbaru. Untuk informasi selengkapnya tentang versi mesin, lihat Mesin.Karena konektor dikemas langsung ke dalam file JAR sebagai dependensi proyek, Anda harus menggunakan cakupan default `compile`.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>Untuk mengembangkan konektor baru atau memperluas fitur konektor yang sudah ada, proyek juga memerlukan paket publik
flink-connector-baseatauververica-connector-common.<!-- Basic dependency for the Flink connector common interface --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency for the Alibaba Cloud connector common interface --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>Untuk informasi konfigurasi koneksi DataStream dan contoh kode, lihat dokumentasi untuk konektor DataStream yang sesuai.
Untuk daftar konektor yang dapat digunakan sebagai tipe DataStream, lihat Konektor yang didukung.
Baca file dependensi tambahan dari OSS
Pekerjaan JAR Flink tidak mendukung pembacaan file konfigurasi lokal dari dalam fungsi `main`. Sebagai gantinya, Anda dapat mengunggah file konfigurasi ke bucket OSS yang terkait dengan ruang kerja Flink Anda. Saat menerapkan pekerjaan, Anda kemudian dapat membaca file tersebut dengan menambahkannya sebagai dependensi tambahan. Bagian berikut memberikan contohnya.
Buat file konfigurasi bernama `config.properties` untuk menghindari penggunaan password teks biasa dalam kode Anda.
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=passwordGunakan kode dalam pekerjaan JAR untuk membaca file `config.properties` yang disimpan di bucket OSS.
Metode 1: Baca dari Bucket OSS yang dilampirkan ke ruang kerja
Di Konsol pengembangan Realtime Compute for Apache Flink, buka halaman Resource Management di panel navigasi sebelah kiri dan unggah file tersebut.
Saat pekerjaan berjalan, file dependensi tambahan yang Anda tambahkan selama penerapan dimuat ke direktori /flink/usrlib pod tempat pekerjaan berjalan.
Kode berikut menunjukkan contoh cara membaca file konfigurasi.
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // Load the properties file. properties.load(input); // Get property values. configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
Metode 2: Baca dari Bucket OSS yang memiliki izin akses oleh ruang kerja
Unggah file konfigurasi ke bucket OSS tujuan.
Anda dapat menggunakan `OSSClient` untuk langsung membaca file yang disimpan di OSS. Untuk informasi selengkapnya, lihat Stream dan Manage access credentials. Berikut adalah contoh kodenya.
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
Tulis kode bisnis
Kode berikut menunjukkan cara mengintegrasikan sumber data eksternal ke dalam program aliran data Flink.
watermarkadalah mekanisme dalam Flink yang mengukur kemajuan dalam event time dan sering digunakan bersama timestamp. Contoh ini tidak menggunakan kebijakan watermark. Untuk informasi selengkapnya, lihat Watermark Strategies.// Integrate an external data source into the Flink data stream program. // WatermarkStrategy.noWatermarks() indicates that no watermark policy is used. DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");Kode berikut menunjukkan transformasi operator. Contoh ini mengonversi
DataStream<String>menjadiDataStream<Student>. Untuk informasi selengkapnya tentang transformasi operator kompleks dan metode pemrosesan, lihat Flink Operators.// An operator that transforms the data structure to student. DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // The data is separated by commas. String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // Filter data to find scores greater than or equal to 60.
Kemas pekerjaan
Anda dapat mengemas pekerjaan menggunakan plugin `maven-shade-plugin`.
Jika Anda memilih untuk mengimpor konektor sebagai file dependensi tambahan, pastikan cakupan dependensi terkait konektor diatur ke
providedsaat mengemas pekerjaan.Jika Anda memilih untuk mengemas konektor sebagai dependensi, Anda dapat menggunakan cakupan default `compile`.
Uji dan terapkan pekerjaan
Realtime Compute for Apache Flink tidak memiliki akses internet secara default. Oleh karena itu, Anda mungkin tidak dapat menguji kode Anda secara lokal. Kami menyarankan Anda melakukan pengujian unit secara terpisah. Untuk informasi selengkapnya, lihat Run and debug a job that contains connectors locally.
Untuk informasi tentang cara menerapkan pekerjaan JAR, lihat Deploy a JAR job.
CatatanSaat menerapkan pekerjaan, jika Anda memilih untuk mengunggah konektor sebagai file dependensi tambahan, Anda harus mengunggah paket JAR konektor terkait.
Untuk membaca file konfigurasi, Anda juga harus mengunggahnya sebagai file dependensi tambahan.

Kode contoh lengkap
Kode contoh ini menunjukkan bagaimana data dari sumber data Kafka diproses lalu ditulis ke MySQL. Contoh ini hanya untuk referensi. Untuk informasi selengkapnya tentang panduan gaya dan kualitas kode, lihat Code Style and Quality Guide.
Contoh ini tidak mencakup konfigurasi untuk parameter runtime seperti checkpoint, Time to Live (TTL), dan kebijakan restart. Anda dapat menyesuaikan konfigurasi ini di halaman Deployment Details setelah pekerjaan diterapkan. Konfigurasi yang ditetapkan dalam kode memiliki prioritas lebih tinggi daripada yang ditetapkan di halaman. Kami menyarankan Anda menyesuaikan konfigurasi ini di halaman setelah penerapan. Pendekatan ini menyederhanakan modifikasi dan penggunaan ulang di masa mendatang. Untuk informasi selengkapnya, lihat Configure job deployment information.
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// Define the data structure.
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// Create a Flink execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// Load the properties file.
properties.load(input);
// Get property values.
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Integrate an external data source into the Flink data stream program.
// WatermarkStrategy.noWatermarks() indicates that no watermark policy is used.
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// Filter data to find scores greater than or equal to 60.
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // The number of records written in each batch.
.withBatchIntervalMs(2000) // The maximum delay for retries in milliseconds.
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink Dependencies -->
<!-- These dependencies are provided because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- Add a logging framework to generate console output at runtime. -->
<!-- By default, these dependencies are excluded from the application JAR. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade-plugin to create a fat jar that contains all required dependencies. -->
<!-- Modify the value of <mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- Remove some unnecessary dependencies. -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise, this may cause a security exception when you use the JAR file. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>Referensi
Untuk daftar konektor yang dapat digunakan sebagai tipe DataStream, lihat Konektor yang didukung.
Untuk contoh lengkap proses pengembangan pekerjaan JAR Flink, lihat Quick Start for Flink JAR jobs.
Realtime Compute for Apache Flink juga mendukung pekerjaan SQL dan Python. Untuk informasi tentang cara mengembangkan pekerjaan tersebut, lihat Job development map dan Develop Python jobs.