All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop JAR jobs

Last Updated:Mar 26, 2026

The Flink DataStream API lets you define custom data transformations, operators, and processing logic for complex streaming workloads. This topic walks you through developing a JAR job using the DataStream API, from setting up Maven dependencies to packaging and deploying the job.

Prerequisites

Before you begin, make sure you have:

  • An integrated development environment (IDE) such as IntelliJ IDEA

  • Maven 3.6.3 or later

  • JDK 8 or JDK 11 (no other versions are supported)

  • The data sources required for your job. This example uses Message Queue for Apache Kafka 2.6.2 and ApsaraDB RDS for MySQL 8.0.

Kafka and MySQL instances must be in the same virtual private cloud (VPC) as your Realtime Compute for Apache Flink workspace. MySQL instances must also be in the same region. If you don't have these data sources, set them up before proceeding: Create a Message Queue for Apache Kafka instance and Create an ApsaraDB RDS for MySQL instance. For cross-VPC or internet access, see Select a network connection method.

JAR jobs are developed offline and deployed to the Realtime Compute for Apache Flink console. The DataStream API in Realtime Compute for Apache Flink is fully compatible with open source Apache Flink. For background, see the Apache Flink architecture overview and the Flink DataStream API developer guide.

Step 1: Configure Flink dependencies

Add the core Flink dependencies to your Maven POM file. These dependencies are needed at compile time but must not be packaged into the job JAR — they are already provided by the runtime environment. If you include them, the best case is a bloated JAR; the worst case is classloading conflicts between your bundled versions and the runtime's.

Set the scope to provided for all org.apache.flink dependencies that start with flink-:

<!-- Core Flink dependencies — set to provided to avoid JAR conflicts -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

${flink.version} must match the Flink version of the Ververica Runtime (VVR) engine you select when deploying the job. For example, the vvr-8.0.9-flink-1.17 engine corresponds to Flink 1.17.2. Only methods annotated @Public or @PublicEvolving in the Flink source code are considered stable — Realtime Compute for Apache Flink only guarantees compatibility for these methods. To look up VVR engine version details, see How do I view the Flink version of the current job?

For a full list of dependencies including logging, see the complete sample code at the end of this topic.

Step 2: Add connector dependencies

To read from and write to external systems with the DataStream API, add the corresponding VVR DataStream connector. Only use connectors explicitly listed as supporting the DataStream API in Supported connectors — other connectors may change interfaces without notice.

Choose an import strategy

Strategy Dependency scope When to use
Upload connector as an additional dependency (recommended) provided Keep your job JAR lean and manage connector versions independently. The connector JAR is loaded separately at runtime.
Package connector into the job JAR compile (default) Ship everything in one artifact. Use this when you need tight control over transitive dependencies or when a separate upload step isn't practical.

Option 1: Upload as an additional dependency (recommended)

  1. Add the connector as a provided dependency in the POM file. Because the connector JAR is loaded separately at runtime, setting provided prevents it from being bundled into the job JAR.

    <!-- Kafka connector — loaded as an additional dependency at runtime -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- MySQL connector — loaded as an additional dependency at runtime -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
        <scope>provided</scope>
    </dependency>

    ${vvr.version} is the version of the VVR runtime engine. For example, if your job runs on vvr-8.0.9-flink-1.17, the corresponding Flink version is 1.17.2. Use the latest engine version where possible — see Engines.

  2. If you're developing a custom connector or extending an existing one, also add these base packages:

    <!-- Base interface for Flink connectors -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Base interface for Alibaba Cloud connectors -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-common</artifactId>
        <version>${vvr.version}</version>
    </dependency>
  3. For DataStream connection configuration and code samples, see the documentation for each connector. A full list of DataStream-compatible connectors is in Supported connectors.

  4. When deploying the job, upload the connector JAR file in the Additional Dependencies section. Download Alibaba Cloud-provided connectors from the connector list. For deployment steps, see Deploy a JAR job.

    image

Option 2: Package connector into the job JAR

  1. Add connectors with the default compile scope. They will be bundled directly into the job JAR.

    <!-- Kafka connector — bundled into the job JAR -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <!-- MySQL connector — bundled into the job JAR -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
  2. If you're developing or extending a connector, add the same base packages as in Option 1.

  3. For DataStream connection configuration, see the documentation for each connector in Supported connectors.

Step 3: Read configuration from OSS

JAR jobs cannot read local configuration files from within the main function. Instead, upload your configuration file to an Object Storage Service (OSS) bucket associated with your Flink workspace and read it at runtime.

Storing credentials in a config file rather than hardcoding them in your source code is a security best practice.

  1. Create a config.properties file:

    # Kafka
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. Load this file in your job code using one of the following methods.

Method 1: Read from the workspace OSS bucket

Upload the file through the Resource Management page in the Realtime Compute for Apache Flink console. When the job runs, additional dependency files are loaded into the /flink/usrlib/ directory of the job pod.

Properties properties = new Properties();
Map<String, String> configMap = new HashMap<>();

try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
    // Load the properties file.
    properties.load(input);
    // Read property values.
    configMap.put("bootstrapServers", properties.getProperty("bootstrapServers"));
    configMap.put("inputTopic", properties.getProperty("inputTopic"));
    configMap.put("groupId", properties.getProperty("groupId"));
    configMap.put("url", properties.getProperty("database.url"));
    configMap.put("username", properties.getProperty("database.username"));
    configMap.put("password", properties.getProperty("database.password"));
} catch (IOException ex) {
    ex.printStackTrace();
}

Method 2: Read from an OSS bucket directly

Upload the file to any OSS bucket that the workspace has permission to access, then read it using OSSClient. For more information, see Stream download and Manage access credentials.

OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
     BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
    // Read file and process...
} finally {
    if (ossClient != null) {
        ossClient.shutdown();
    }
}

Step 4: Write business logic

The following snippets show the two main DataStream patterns used in the complete sample: integrating a source and transforming data.

Integrate a Kafka source. A watermark measures event-time progress and is often used with timestamps. This example uses no watermark policy.

// Attach a Kafka source to the execution environment.
// WatermarkStrategy.noWatermarks() skips event-time watermark processing.
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

For watermark options, see Watermark strategies.

Transform the stream. This example converts DataStream<String> to DataStream<Student> and filters records with a score below 60.

// Map each comma-separated string to a Student object, then filter by score.
DataStream<Student> source = stream
    .map(new MapFunction<String, Student>() {
        @Override
        public Student map(String s) throws Exception {
            String[] data = s.split(",");
            return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
        }
    }).filter(student -> student.score >= 60);

For more operator types, see Flink operators.

Step 5: Package the job

Use maven-shade-plugin to create a fat JAR that bundles all required dependencies.

Scope rules when packaging:

  • Connectors uploaded as additional dependencies: keep scope as provided — do not bundle them.

  • Connectors packaged into the JAR: use the default compile scope.

<build>
    <plugins>
        <!-- Java compiler plugin -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.11.0</version>
            <configuration>
                <source>${target.java.version}</source>
                <target>${target.java.version}</target>
            </configuration>
        </plugin>

        <!-- Shade plugin: creates a fat JAR with all required dependencies.
             Update <mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <!-- Exclude unnecessary dependencies -->
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>org.apache.logging.log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Exclude META-INF signatures to prevent security exceptions. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.aliyun.FlinkDemo</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Step 6: Test and deploy

Realtime Compute for Apache Flink has no internet access by default, which means end-to-end local testing with live connectors is not possible in most setups. Run unit tests for your business logic independently before deployment. For guidance on running connector-dependent tests locally, see Run and debug a job with connectors locally.

To deploy the job:

  1. Follow the steps in Deploy a JAR job.

  2. If you're using the additional dependency approach, upload the connector JAR file and the config.properties file in the Additional Dependencies section.

    image

  3. Configure runtime parameters — checkpoints, Time to Live (TTL), and restart strategies — on the Deployment Details page after deployment. Settings configured on the page are easier to update without modifying or repackaging the job code. Note that code-level settings take priority over page-level settings. For details, see Configure job deployment information.

Complete sample code

This example reads student records from Kafka, filters out scores below 60, and writes passing records to a MySQL table. For code style guidelines, see the Flink code style and quality guide.

Runtime parameters such as checkpoints, TTL, and restart strategies are not included in this sample. Configure them on the Deployment Details page after deployment to keep future updates simple. For details, see Configure job deployment information.

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // Data model for student records.
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // Create a Flink execution environment.
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String, String> configMap = new HashMap<>();

        // Load connection configuration from the file uploaded as an additional dependency.
        // The file is available at /flink/usrlib/ when the job runs.
        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            properties.load(input);
            configMap.put("bootstrapServers", properties.getProperty("bootstrapServers"));
            configMap.put("inputTopic", properties.getProperty("inputTopic"));
            configMap.put("groupId", properties.getProperty("groupId"));
            configMap.put("url", properties.getProperty("database.url"));
            configMap.put("username", properties.getProperty("database.username"));
            configMap.put("password", properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Build the Kafka source starting from the latest offset.
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(configMap.get("bootstrapServers"))
                .setTopics(configMap.get("inputTopic"))
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId(configMap.get("groupId"))
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // Attach the Kafka source. No watermark policy is applied in this example.
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // Parse each record into a Student object and keep only scores >= 60.
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >= 60);

        // Write passing records to MySQL in batches of 5, flushing every 2 seconds.
        source.addSink(JdbcSink.sink(
                "INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5)           // Records per batch.
                        .withBatchIntervalMs(2000)  // Max flush interval in milliseconds.
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>

    <dependencies>
        <!-- Core Flink dependencies — provided scope keeps them out of the job JAR. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Connector dependencies — use compile scope to bundle them in the job JAR. -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- Logging framework — runtime scope excludes it from the application JAR. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

What's next