All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a Flink Java job

Last Updated:Dec 23, 2025

The Flink DataStream API provides a flexible programming model for customizing data transformations, operations, and operators. It is well-suited for applications with complex business logic and data processing needs. This article explains how to develop a Flink job in Java and run it with Realtime Compute for Apache Flink.

Apache Flink compatibility

The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with the Apache Flink version. For more information, see What is Apache Flink? and Flink DataStream API Programming Guide.

Environment requirements

  • An IDE, such as IntelliJ IDEA, is installed.

  • Maven 3.6.3 or later is installed.

  • Job development requires JDK 8 or JDK 11.

  • You must develop your DataStream program locally and run it on Realtime Compute for Apache Flink.

Preparations

Before you start, prepare data sources for this example.

Note

Set up the Java project

Environment dependencies

Note

To avoid Maven dependency conflicts, follow these rules:

  • Use consistent Flink versions: ${flink.version} must match the Flink version of the Ververica Runtime (VVR) engine. For example, if you select vvr-8.0.9-flink-1.17, set ${flink.version} to 1.17.2. For more information, see How do I view the engine version of a job?

  • Use provided scope for Flink-releated dependencies: This rule primarily applies to non-connector dependencies that start with flink- under the org.apache.flink group.

  • Use public API methods: Those methods are explicitly annotated with @Public or @PublicEvolving in the Flink source code. We guarantee compatibility only for them.

  • When using Realtime Compute for Apache Flink's built-in connectors, use their supported APIs over those from Apache Flink connector methods.

The following are some basic Flink dependencies. You may also need to add logging dependencies. For a complete list of dependencies, see the Complete code example at the end of this topic.

Sample dependencies
         <!-- Apache Flink dependencies -->
        <!-- Provide the dependencies to prevent them from being packaged in a JAR file.   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

Set connector dependencies and use connectors

To read and write data with the DataStream API, use the DataStream connectors, available in the VVR Maven central repository.

Important

For a list of reliable DataStream connectors, see Supported connectors. Don't use a connector if it's not explicitly marked for DataStream API support, because its interfaces and parameters may change without notice.

Use a connector in either of the following ways:

(Recommended) Upload the connector uber JAR as an additional dependency

  1. In your project's pom.xml file, add the required connector as a dependency and set its scope to provided. For a complete dependency file, see the Complete code example at the end of this topic.

    Note
    • Version: Ensure ${vvr.version} matches your selected VVR version in the console. For instance, if ${vvr.version} is vvr-8.0.9-flink-1.17, your JAR deployment must use the corresponding VVR, which has Flink version 1.17.2. For specific versions, see Engine updates.

    • Connector scope: Use <scope>provided</scope> for the connector. This is because the connector's fat JAR is uploaded as an independent dependency, meaning it should not be included in your job's JAR file.

            <!-- Kafka connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. To use a new connector or extend an existing one, add the flink-connector-base or ververica-connector-common dependencies.

            <!-- Basic dependency of the public interface of Flink connectors -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Basic dependency of the public interface of Alibaba Cloud connectors -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. Set DataStream connection configurations.

    Refer to the corresponding connector documents for details and code examples.

  4. In the console, create a JAR deployment and add the connector uber JAR(s) in the Additional Dependencies field. You can upload a custom connector or a built-in connector JAR, available for download at Connectors. Example:

    image

Package the connector directly into the job JAR

  1. Add a connector as a project dependency in your job's pom.xml file. Example:

    Note
    • Version: Ensure ${vvr.version} matches your selected VVR version in the console. For instance, if ${vvr.version} is vvr-8.0.9-flink-1.17, your job's JAR deployment must use the corresponding VVR, which has Flink version 1.17.2. For specific versions, see Engine updates.

    • Connector scope: Use <scope>compile</scope> for the connector.

            <!-- Kafka connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
            </dependency>
            <!-- MySQL connector dependency -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  2. To use a custom connector or extend an existing one, add the flink-connector-base or ververica-connector-common dependencies.

            <!-- Basic dependency of the public interface of Flink connectors -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Basic dependency of the public interface of Alibaba Cloud connectors -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. Set DataStream connection configurations.

    Refer to the corresponding connector documents for details and code examples.

Prepare the config file

Flink Java jobs do not support reading a local config file directly from the main method. To work around this, upload your config file to an Object Storage Service (OSS) bucket and access it as an additional dependency during job deployment.

  1. Create a config.properties file to avoid hardcoding plaintext values.

    # Kafka 
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. In your code, read the config.properties file stored in the OSS bucket.

    Method 1: Read from the OSS bucket associated to your workspace

    1. In the left navigation menu of the console, navigate to Artifacts and upload the config.properties file.

    2. When the job runs, the Flink runtime loads files added to the Additional Dependencies field into the /flink/usrlib directory of the job's pod.

    3. The following code shows how to read this config file.

                  Properties properties = new Properties();
                  Map<String,String> configMap = new HashMap<>();
      
                  try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
                      // Load the property file.
                      properties.load(input);
                      // Obtain the property values.
                      configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
                      configMap.put("inputTopic",properties.getProperty("inputTopic"));
                      configMap.put("groupId",properties.getProperty("groupId"));
                      configMap.put("url",properties.getProperty("database.url")) ;
                      configMap.put("username",properties.getProperty("database.username"));
                      configMap.put("password",properties.getProperty("database.password"));
                  } catch (IOException ex) {
                      ex.printStackTrace();
                  }

    Method 2: Read from an OSS bucket accessible to the workspace

    1. Upload the config file to the target OSS bucket.

    2. Use OSSClient to read the file directly from OSS. For more information, see Perform streaming download by using OSS SDK for Java and Manage access credentials. Example:

      OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
      try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
           BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
          // read file and process ...
      } finally {
          if (ossClient != null) {
              ossClient.shutdown();
          }
      }

Write code

  1. Integrate an external data source. Since event time processing is not required for this example, WatermarkStrategy.noWatermarks() is used. For more information, see Generating Watermarks.

             // Integrate external data sources into data streaming programs
            // No watermark strategy is used
            DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
  2. Transform data. The example converts a DataStream<String> to a DataStream<Student> and then filters the results. For more information, see Operators.

              // Convert the operator whose data structure is student.
              DataStream<student> source = stream
                    .map(new MapFunction<String, student>() {
                        @Override
                        public student map(String s) throws Exception {
                            // Separate data with commas (,).
                            String[] data = s.split(",");
                            return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                        }
                    }).filter(student -> student.score >=60); // Obtain data records whose score is greater than 60.

Package your job into a JAR

Package the code and dependencies into a fat JAR using maven-shade-plugin.

Important
  • If you upload connector JARs as additional dependency files, ensure you set <scope>provided</scope> for the connector dependencies.

  • If you package connectors into your job JAR, use <scope>compile</scope>.

Example

<build>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
            <!-- Modify the value of <mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- Remove some unnecessary dependencies. -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signature in the META-INF folder. Otherwise, a security error may occur when you use the JAR file. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Test and deploy the job

  • Conduct unit tests, which are recommended because a full end-to-end test is often not feasible locally. Your development environment typically cannot access cloud data sources within your VPC. For details, see Run and debug connectors locally.

  • Deploy the job. For details, see Create a JAR deployment.

    Note
    • If you add connectors as additional dependency files, upload the connector uber JARs.

    • To read the config file from OSS, upload it and add it as an additional dependency.

    image

Complete code example

This example reads data from a Kafka source, processes it, and writes the results to MySQL. This code is for reference only. For more information on code style and quality, see the Apache Flink Code Style and Quality Guide.

Note

This example does not include configurations for runtime parameters such as checkpointing, TTL, and restart strategy. Set them on the Configuration tab of the job deployment's details page for easier modification and management. For more information, see Configure a job deployment.

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // Define the data structure.
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // Create a Flink execution environment.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String,String> configMap = new HashMap<>();

        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            // Load the config file.
            properties.load(input);
            // Obtain the property values.
            configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
            configMap.put("inputTopic",properties.getProperty("inputTopic"));
            configMap.put("groupId",properties.getProperty("groupId"));
            configMap.put("url",properties.getProperty("database.url")) ;
            configMap.put("username",properties.getProperty("database.username"));
            configMap.put("password",properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Build Kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(configMap.get("bootstrapServers"))
                        .setTopics(configMap.get("inputTopic"))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(configMap.get("groupId"))
                        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                        .build();

        // Integrate external data sources
        // Specify WatermarkStrategy.noWatermarks(), which indicates that no watermark strategy is used.
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // Obtain data records whose score is greater than 60.
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >=60);

        source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5) // The number of records to be written in each batch.
                        .withBatchIntervalMs(2000) // The maximum delay that is allowed for a retry. Unit: milliseconds.
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- Provide the dependencies to prevent them from being packaged in a JAR file.   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add the connector dependencies and specify <scope>compile</scope> for the dependencies.   -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- Add a logging framework to generate output data in the development console of Realtime Compute for Apache Flink when you run the deployment. -->
        <!-- By default, these dependencies are excluded from the JAR file in the program. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
            <!-- Modify the value of <mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- Remove unnecessary dependencies. -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signature in the META-INF folder. Otherwise, a security error may occur when you use the JAR file. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

References