Datastream development packaging issues

Flink's core dependencies and the application's own dependencies

Every Flink application depends on a series of related libraries, which should include at least Flink's API. Many applications also depend on connector-related libraries (such as Kafka, Cassandra, etc.). When running a Flink application, whether it is Whether running in a distributed environment or testing in a local IDE, Flink's runtime-related dependencies are required.

As with most systems running user-defined applications, there are two broad categories of dependencies in Flink:

Flink core dependencies: Flink itself consists of a set of classes and dependencies necessary to run the system, such as coordinator, network, checkpoint, fault tolerance, API, operators (such as windows), resource management, etc. The collection of all these classes and dependencies forms the core of the Flink runtime and must be present when a Flink application starts. These core classes and dependencies are packaged in the flink-dist jar. They are part of Flink's lib folder and part of the Flink base container image. These dependencies are to Flink what the core libraries (rt.jar, charsets.jar, etc.) containing classes such as String and List required for Java to run are to Java. Flink's core dependencies do not contain any connectors or extension libraries (CEP, SQL, ML, etc.), which makes Flink's core dependencies as small as possible to avoid excessive dependencies in the classpath by default and reduce dependency conflicts .

User Application Dependencies: Refers to all connectors, formats, or extension libraries required by a particular user application. User applications are typically packaged as a jar file, which contains the application code as well as required connector and library dependencies. User application dependencies should not include Flink DataStream API and runtime dependencies, as these are already included in Flink's core dependencies.

Dependency configuration steps

1. Add basic dependencies

The development of each Flink application needs to add at least a basic dependency on the relevant API.

When manually configuring the project, you need to add a dependency on the Java/Scala API (here Maven is used as an example, the same dependency can be used in other build tools (Gradle, SBT, etc.)).

Important note: Note that all of these dependencies have their scope set to "provided". This means that they need to be compiled, but they should not be packaged into the application jar file generated by the project - these dependencies are Flink core dependencies, which are already loaded at actual runtime.

It is highly recommended to set the dependencies to the "provided" scope, failing to set them to "provided" will at best result in a bloated jar as it also includes all Flink core dependencies. And in the worst case, the Flink core dependencies added to the application jar file will have version conflicts with some of your own dependencies (usually avoided by Flink's reverse class loading mechanism).

Note on IntelliJ: In order for the application to run in IntelliJ IDEA, it is necessary to check the "Include dependencies with "Provided" scope" option box in the run configuration. If you don't have that option (probably due to using an older IntelliJ IDEA version), then a simple workaround is to create a test case that calls the application's main() method.

2. Add connector and library dependencies

Most applications require specific connectors or libraries to run, such as connectors for Kafka, Cassandra, etc. These connectors are not part of Flink's core dependencies and must be added to the application as additional dependencies.

We recommend packaging the application code and all its dependencies into an application jar in the form of jar-with-dependencies. The application jar package can be submitted to an existing Flink cluster, or added to the container image of the Flink application.

For a project created from a Maven job template (see the Maven job template section below), the dependencies will be automatically added to the application's jar package through the mvn clean package command. For the case where no template is used for configuration, it is recommended to use Maven Shade Plugin (configuration as shown in the appendix) to build a jar package containing dependencies.

Important note: For Maven (and other build tools) to properly package dependencies into the application jar, those application dependencies must be scoped to "compile" (unlike core dependencies, whose scope must be specified as "provided").

Scala version

Different versions of Scala (2.11, 2.12, etc.) are not compatible with each other. Therefore, the Flink version corresponding to Scala 2.11 cannot be used for applications using Scala 2.12.

All Flink dependencies that depend (or transitively) on Scala are suffixed with the Scala version they were built with, e.g. flink-streaming-scala_2.11 .

You can choose any Scala version when you only use Java for development, and you need to choose the Flink dependency version that matches the Scala version of your application when developing with Scala.

Note: Scala versions after 2.12.8 are not compatible with previous 2.12.x versions, so Flink projects cannot upgrade their 2.12.x versions to versions after 2.12.8. Users can compile Flink corresponding to the Scala version locally. In order for this to work, -Djapicmp.skip needs to be added to skip binary compatibility checks at build time.

Hadoop dependencies

General rule: Never add Hadoop-related dependencies directly to your application. (The only exception is when using an existing Hadoop input/output format with Flink's Hadoop compatibility package)

If you want to use Flink with Hadoop, you need to include the Flink startup item on which Hadoop depends, instead of adding Hadoop as an application dependency. Flink will use the Hadoop dependencies specified by the HADOOP_CLASSPATH environment variable, which can be set via:

export HADOOP_CLASSPATH**=**hadoop classpath``

There are two main reasons for this design:

Some interactions with Hadoop may take place in Flink's core modules and before the user application starts, such as setting up HDFS for checkpointing, authenticating via Hadoop's Kerberos tokens, or deploying on YARN, etc.
Flink's reverse classloading mechanism hides many transitive dependencies from core dependencies. This applies not only to Flink's own core dependencies, but also to Hadoop's dependencies. This way, applications can use different versions of the same dependencies without dependency conflicts (trust us, this is a big deal because Hadoop dependency trees are huge.)
If you need Hadoop dependencies (such as HDFS access) during testing or development inside the IDE, please configure the scope of these dependencies as test or provided.

Mechanism to load a table's connector/format factory with a specific identifier. Since the SPI resource file named org.apache.flink.table.factories.Factory of each table's connector/format is located in the same directory: under META-INF/services, when building a project that uses multiple table connectors/formats uber jar, these resource files will overwrite each other, which will cause Flink to fail to load the factory class correctly.

In this case, the recommended method is to transform these resource files under the META-INF/services directory by the ServicesResourceTransformer of the maven shade plugin. The pom.xml file content for the given example is as follows, which contains the connectors flink-sql-connector-hive-3.1.2 and flink-parquet format.

After the ServicesResourceTransformer is configured, when the project builds uber-jar, these resource files in the META-INF/services directory will be integrated instead of overwriting each other.

Maven job template
It is strongly recommended to use this method for configuration, which can reduce a lot of repeated configuration work.

The only environmental requirements are Maven 3.0.4 (or higher) and Java 8.x installed.

create project
Create a project using one of the following two methods.

This allows you to name the newly created project. It will interactively ask you for the groupId, artifactId and package name.

run the quickstart script

$ curl | bash -s 1.12.3
We recommend that you import this project into your IDE to develop and test it. IntelliJ IDEA natively supports Maven projects. If you use Eclipse, you can use the m2e plugin to import Maven projects. Some Eclipse bundles include the plugin by default, otherwise you need to install it manually.

Please note: The default Java JVM heap size may be too small for Flink. You have to increase it manually. In Eclipse, select RunConfigurations->Arguments and write to the VM Arguments box: -Xmx800m. In IntelliJ IDEA, the recommended way to change JVM options is to use the Help | Edit Custom VM Options options menu. See this article for details.

build project
If you want to build/package the project, go to the project directory and run "mvn clean package" command. Execution will result in a JAR file: target/-.jar containing your application, along with the connectors and libraries added to the application as dependencies.

Note: If you are using a different class than StreamingJob as the main class/entry point of your application, we recommend that you change the mainClass setting in your pom.xml file accordingly. This way, Flink can run the application directly from the JAR file without additionally specifying the main class.

Appendix: Template for building jar packages with dependencies
To build an application JAR containing all dependencies required by the linker and library, the following shade plugin definition can be used.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us