JAR drafts of Realtime Compute for Apache Flink provide a more flexible programming model and APIs. The programming model and APIs allow you to specify data conversions, perform operations, and configure operators to meet requirements for complex business logic and data processing. This topic describes how to develop and debug a JAR draft of Realtime Compute for Apache Flink.
Background information
DataStream allows you to call the underlying API operations of Realtime Compute for Apache Flink so that you can use Realtime Compute for Apache Flink in a flexible manner. The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with Apache Flink. For more information, see Flink DataStream API Programming Guide.
Table API drafts are developed in the same way as DataStream drafts.
Limits
To prevent the impact of factors such as deployment and network environments, take note of the following points:
Only DataStream JAR drafts can be published. A JAR draft can use only one main JAR file and multiple JAR dependencies.
Realtime Compute for Apache Flink cannot read local configurations by using the main() method in a JAR draft.
Java Development Kit (JDK) 1.8 is used in the runtime environment. Therefore, you must also use JDK 1.8 to develop drafts.
Only open source Scala V2.12 is supported.
Precautions
The parameters in the deployment code take precedence over the parameters that you configured in the Realtime Compute for Apache Flink console. To ensure that your deployment runs as expected, we recommend that you configure checkpoint-related parameters in the Realtime Compute for Apache Flink console instead of in the deployment code.
If the upstream and downstream storage systems that DataStream deployments of Realtime Compute for Apache Flink access use a whitelist mechanism, you must configure whitelists. For more information, see How do I configure a whitelist?
To prevent conflicts between JAR package dependencies, take note of the following points:
Make sure that the Realtime Compute for Apache Flink version that you select on the Draft Editor page is the same as that specified in the dependency library of Apache Flink in the POM file. For more information about how to view the Realtime Compute for Apache Flink version, see How do I query the engine version of Flink that is used by a deployment?
Specify
<scope>provided</scope>
for dependencies related to Realtime Compute for Apache Flink. Dependencies related to Realtime Compute for Apache Flink mainly include non-connector dependencies whose names start withflink-
in theorg.apache.flink
group, such as flink-streaming-java, flink-streaming-java_2.11, flink-java, and flink-runtime.NoteWe recommend that you configure dependencies during draft development to avoid conflicts between dependencies related to Realtime Compute for Apache Flink. For more information about how to configure dependencies, see How do I troubleshoot dependency conflicts of Flink?
Use the Shade plug-in to package other third-party dependencies. For more information, see Use maven-shade-plugin to package files.
Call only methods that are explicitly marked with @Public or @PublicEvolving in the source code of Apache Flink. Alibaba Cloud Realtime Compute for Apache Flink only ensures that Realtime Compute for Apache Flink is compatible with these methods.
Use the built-in dependencies of Realtime Compute for Apache Flink if the DataStream APIs supported by the built-in connectors of Realtime Compute for Apache Flink are used.
Develop a draft
You must develop drafts in your on-premises environment before you deploy the drafts for running in the Realtime Compute for Apache Flink console.
References
For more information about the description, architecture, applications, and features of Apache Flink, see Introduction to Apache Flink.
For more information about how to develop business code of Apache Flink 1.17, see Flink DataStream API Programming Guide and Flink Table API and SQL.
For more information about the coding guide, Java and Scala language guides, and component and formatting guides, see Apache Flink Code Style and Quality Guide.
For more information about the configuration of the source code of Apache Flink deployments, see Project Configuration.
For more information about DataStream connector dependencies of Realtime Compute for Apache Flink, see Connector dependencies.
Issues may occur when you develop code in Apache Flink. For more information about the issues and fixes, see FAQ.
Read data from a file stored in OSS
The following table describes the methods that you can use to read data from a configuration file stored in an Object Storage Service (OSS) bucket.
Method | Description | Scenario | Procedure |
Method 1 | Run code in a JAR draft to read data from a configuration file that is stored on a local disk. | Only data from the OSS bucket that is associated with your workspace can be read. |
|
Method 2 | Use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS. | Only data from the OSS bucket on which Realtime Compute for Apache Flink has the access permission can be read. | For more information about how to use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS, see Streaming download. The following sample code shows how to read data from a configuration file that is stored in OSS.
|
Use a connector
If you want to read and write data in DataStream mode, you must use the related type of DataStream connector to connect to Realtime Compute for Apache Flink. You can use the Ververica Runtime (VVR) DataStream connectors that are stored in the Maven central repository to develop drafts. You can use a connector in one of the following ways:
Interfaces and parameters may change in the future. We recommend that you use the connectors that are specified to provide DataStream APIs in Supported connectors.
(Recommended) Upload the uber JAR file of the connector to the console of fully managed Flink and call the JAR file by using provided in a DataStream draft
Prepare the development environment for a DataStream draft.
Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
Check whether the
<mirrorOf>*</mirrorOf>
configuration is contained in your settings.xml configuration file.If the
<mirrorOf>*</mirrorOf>
configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:If the
<mirrorOf>*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.If the
<mirrorOf>external:*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.If the
<mirrorOf>external:http:*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.
Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.
Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the POM file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-continuous-odps</artifactId> <version>${connector.version}</version> <scope>provided</scope> </dependency>
Add the public package
flink-connector-base
of the connectors together with the connectors as project dependencies:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
In the preceding file,
${flink.version}
indicates the Realtime Compute for Apache Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is1.17-vvr-8.0.4-1
, the Realtime Compute for Apache Flink version is1.17.0
.ImportantYou must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
Develop a DataStream draft.
For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:
Package the program and publish a DataStream draft.
Use Maven to package the program and upload the generated JAR file to the Realtime Compute for Apache Flink console. For more information, see Create a JAR deployment.
Upload the uber JAR file of the connector to the Realtime Compute for Apache Flink console.
Log on to the Realtime Compute for Apache Flink console.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Artifacts.
In the upper-right corner of the Artifacts page, click Upload Artifact and select the uber JAR file of the connector that you want to use.
You can upload the JAR file of the connector that you develop or the connector provided by Realtime Compute for Apache Flink. For more information about the download links of the official JAR files of the connectors provided by Realtime Compute for Apache Flink, see Connectors.
ImportantThe Maven repository directory is updated. The latest version of the JAR file of a connector, such as vvr-8.0.4-1-flink-1.17, may not be provided in the level-1 directory. You can directly enter the URL of the required dependencies in the address bar of your browser to download the dependencies. For example, you can enter a URL similar to https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-continuous-odps/1.17-vvr-8.0.4-1/ververica-connector-continuous-odps-1.17-vvr-8.0.4-1-uber.jar to download the required dependencies.
In the Create Deployment dialog box, select the uber JAR file of the desired connector from the drop-down list of Additional Dependencies.
Package the connector as a project dependency into the JAR file of your deployment
Prepare the development environment for a DataStream API draft.
Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
Check whether the
<mirrorOf>*</mirrorOf>
configuration is contained in your settings.xml configuration file.If the
<mirrorOf>*</mirrorOf>
configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:If the
<mirrorOf>*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.If the
<mirrorOf>external:*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.If the
<mirrorOf>external:http:*</mirrorOf>
configuration is contained in the configuration file, change the configuration to<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
.
Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.
Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the POM file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-continuous-odps</artifactId> <version>${connector.version}</version> </dependency>
Add the public package
flink-connector-base
of the connectors together with the connectors as project dependencies:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency>
In the preceding file,
${flink.version}
indicates the Realtime Compute for Apache Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is1.17-vvr-8.0.4-1
, the Realtime Compute for Apache Flink version is1.17.0
.ImportantYou must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
Develop a DataStream draft.
For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:
Package the program and publish a DataStream draft.
Use Maven to package the program and upload the generated JAR file to the Realtime Compute for Apache Flink console. For more information, see Create a JAR deployment.
Use maven-shade-plugin to package files
Use maven-shade-plugin to generate the uber JAR file that contains all dependency JAR files. Sample code:
<project>
<!-- ... Other configurations ... -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- Change the path of the packaged classes -->
<relocations>
<relocation>
<pattern>com.example.oldpackage</pattern>
<shadedPattern>com.example.newpackage</shadedPattern>
</relocation>
</relocations>
<!-- Remove unnecessary dependencies -->
<artifactSet>
<excludes>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<!-- Filter and remove unnecessary files -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
</excludes>
</filter>
</filters>
<!-- Specify an executable JAR file by configuring the mainClass parameter -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.alibaba.ververica.connector.demo.flink.flinkDemoJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- ... Other configurations ... -->
</project>
For more information about how to use the Shade plug-in to package dependencies, see Apache Maven Shade Plugin.
Debug a draft
You can call the DataGeneratorSource operation to enter simulated data and debug a deployment. For more information about how to call the DataGeneratorSource operation, see DataGeneratorSourceTest.java. You can also create a SQL deployment of Realtime Compute for Apache Flink, use the Datagen connector to generate simulated data, and then write the simulated data to the data source that is read by a JAR deployment. Then, the JAR deployment consumes the data.
You can use a print result table to debug a deployment. For more information, see WindowWordCount.java. You must configure parameters to export logs of a deployment to an external storage in advance. For more information, see Configure parameters to export logs of a deployment.
References
For more information about DataStream connectors, see Supported connectors.
For more information about how to run and debug the MaxCompute connector in an on-premises environment, see Run or debug a Flink deployment that includes a connector in an on-premises environment.
For more information about how to create a JAR deployment, see Getting started with a JAR deployment of Realtime Compute for Apache Flink.
For more information about how to develop an SQL draft or a Python API draft, see Develop an SQL draft and Develop a Python API draft.