The Flink DataStream API provides a flexible programming model for customizing data transformations, operations, and operators. It is well-suited for applications with complex business logic and data processing needs. This article explains how to develop a Flink job in Java and run it with Realtime Compute for Apache Flink.
Apache Flink compatibility
The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with the Apache Flink version. For more information, see What is Apache Flink? and Flink DataStream API Programming Guide.
Environment requirements
An IDE, such as IntelliJ IDEA, is installed.
Maven 3.6.3 or later is installed.
Job development requires JDK 8 or JDK 11.
You must develop your DataStream program locally and run it on Realtime Compute for Apache Flink.
Preparations
Before you start, prepare data sources for this example.
By default, Realtime Compute for Apache Flink cannot access the internet. This example uses Message Queue for Apache Kafka (2.6.2) and ApsaraDB RDS for MySQL (8.0) as data sources.
Ensure these systems reside in the same Virtual Private Cloud (VPC) as your Flink workspace. If they are not in the same VPC, see How does Realtime Compute for Apache Flink access a service across VPCs?
If you use self-managed data sources, ensure Flink can access them. For more information, see How does Realtime Compute for Apache Flink access the Internet? and How do I configure a whitelist?
To create a Message Queue for Apache Kafka instance, see Purchase and deploy an instance. Ensure it is in the same VPC as your Flink workspace.
To create an ApsaraDB RDS for MySQL instance, see Create an ApsaraDB RDS for MySQL instance, a database, and an account. Ensure it is in the same VPC as your Flink workspace.
Set up the Java project
Environment dependencies
To avoid Maven dependency conflicts, follow these rules:
Use consistent Flink versions:
${flink.version}must match the Flink version of the Ververica Runtime (VVR) engine. For example, if you selectvvr-8.0.9-flink-1.17, set${flink.version}to1.17.2. For more information, see How do I view the engine version of a job?Use
providedscope for Flink-releated dependencies: This rule primarily applies to non-connector dependencies that start withflink-under theorg.apache.flinkgroup.Use public API methods: Those methods are explicitly annotated with
@Publicor@PublicEvolvingin the Flink source code. We guarantee compatibility only for them.When using Realtime Compute for Apache Flink's built-in connectors, use their supported APIs over those from Apache Flink connector methods.
The following are some basic Flink dependencies. You may also need to add logging dependencies. For a complete list of dependencies, see the Complete code example at the end of this topic.
Sample dependencies
Set connector dependencies and use connectors
To read and write data with the DataStream API, use the DataStream connectors, available in the VVR Maven central repository.
For a list of reliable DataStream connectors, see Supported connectors. Don't use a connector if it's not explicitly marked for DataStream API support, because its interfaces and parameters may change without notice.
Use a connector in either of the following ways:
(Recommended) Upload the connector uber JAR as an additional dependency
In your project's
pom.xmlfile, add the required connector as a dependency and set its scope toprovided. For a complete dependency file, see the Complete code example at the end of this topic.NoteVersion: Ensure
${vvr.version}matches your selected VVR version in the console. For instance, if${vvr.version}isvvr-8.0.9-flink-1.17, your JAR deployment must use the corresponding VVR, which has Flink version1.17.2. For specific versions, see Engine updates.Connector scope: Use
<scope>provided</scope>for the connector. This is because the connector's fat JAR is uploaded as an independent dependency, meaning it should not be included in your job's JAR file.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>To use a new connector or extend an existing one, add the
flink-connector-baseorververica-connector-commondependencies.<!-- Basic dependency of the public interface of Flink connectors --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency of the public interface of Alibaba Cloud connectors --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>Set DataStream connection configurations.
Refer to the corresponding connector documents for details and code examples.
In the console, create a JAR deployment and add the connector uber JAR(s) in the Additional Dependencies field. You can upload a custom connector or a built-in connector JAR, available for download at Connectors. Example:

Package the connector directly into the job JAR
Add a connector as a project dependency in your job's
pom.xmlfile. Example:NoteVersion: Ensure
${vvr.version}matches your selected VVR version in the console. For instance, if${vvr.version}isvvr-8.0.9-flink-1.17, your job's JAR deployment must use the corresponding VVR, which has Flink version1.17.2. For specific versions, see Engine updates.Connector scope: Use
<scope>compile</scope>for the connector.
<!-- Kafka connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL connector dependency --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>To use a custom connector or extend an existing one, add the
flink-connector-baseorververica-connector-commondependencies.<!-- Basic dependency of the public interface of Flink connectors --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Basic dependency of the public interface of Alibaba Cloud connectors --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>Set DataStream connection configurations.
Refer to the corresponding connector documents for details and code examples.
Prepare the config file
Flink Java jobs do not support reading a local config file directly from the main method. To work around this, upload your config file to an Object Storage Service (OSS) bucket and access it as an additional dependency during job deployment.
Create a
config.propertiesfile to avoid hardcoding plaintext values.# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=passwordIn your code, read the
config.propertiesfile stored in the OSS bucket.Method 1: Read from the OSS bucket associated to your workspace
In the left navigation menu of the console, navigate to Artifacts and upload the
config.propertiesfile.When the job runs, the Flink runtime loads files added to the Additional Dependencies field into the
/flink/usrlibdirectory of the job's pod.The following code shows how to read this config file.
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // Load the property file. properties.load(input); // Obtain the property values. configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
Method 2: Read from an OSS bucket accessible to the workspace
Upload the config file to the target OSS bucket.
Use
OSSClientto read the file directly from OSS. For more information, see Perform streaming download by using OSS SDK for Java and Manage access credentials. Example:OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
Write code
Integrate an external data source. Since event time processing is not required for this example,
WatermarkStrategy.noWatermarks()is used. For more information, see Generating Watermarks.// Integrate external data sources into data streaming programs // No watermark strategy is used DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");Transform data. The example converts a
DataStream<String>to aDataStream<Student>and then filters the results. For more information, see Operators.// Convert the operator whose data structure is student. DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // Separate data with commas (,). String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // Obtain data records whose score is greater than 60.
Package your job into a JAR
Package the code and dependencies into a fat JAR using maven-shade-plugin.
If you upload connector JARs as additional dependency files, ensure you set
<scope>provided</scope>for the connector dependencies.If you package connectors into your job JAR, use
<scope>compile</scope>.
Test and deploy the job
Conduct unit tests, which are recommended because a full end-to-end test is often not feasible locally. Your development environment typically cannot access cloud data sources within your VPC. For details, see Run and debug connectors locally.
Deploy the job. For details, see Create a JAR deployment.
NoteIf you add connectors as additional dependency files, upload the connector uber JARs.
To read the config file from OSS, upload it and add it as an additional dependency.

Complete code example
This example reads data from a Kafka source, processes it, and writes the results to MySQL. This code is for reference only. For more information on code style and quality, see the Apache Flink Code Style and Quality Guide.
This example does not include configurations for runtime parameters such as checkpointing, TTL, and restart strategy. Set them on the Configuration tab of the job deployment's details page for easier modification and management. For more information, see Configure a job deployment.
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// Define the data structure.
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// Create a Flink execution environment.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// Load the config file.
properties.load(input);
// Obtain the property values.
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Integrate external data sources
// Specify WatermarkStrategy.noWatermarks(), which indicates that no watermark strategy is used.
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// Obtain data records whose score is greater than 60.
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // The number of records to be written in each batch.
.withBatchIntervalMs(2000) // The maximum delay that is allowed for a retry. Unit: milliseconds.
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- Provide the dependencies to prevent them from being packaged in a JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add the connector dependencies and specify <scope>compile</scope> for the dependencies. -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- Add a logging framework to generate output data in the development console of Realtime Compute for Apache Flink when you run the deployment. -->
<!-- By default, these dependencies are excluded from the JAR file in the program. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
<!-- Modify the value of <mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- Remove unnecessary dependencies. -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signature in the META-INF folder. Otherwise, a security error may occur when you use the JAR file. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>References
Develop SQL/PyFlink jobs: Job development map and PyFlink jobs