All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a JAR draft

Last Updated:Aug 16, 2024

JAR drafts of Realtime Compute for Apache Flink provide a more flexible programming model and APIs. The programming model and APIs allow you to specify data conversions, perform operations, and configure operators to meet requirements for complex business logic and data processing. This topic describes how to develop and debug a JAR draft of Realtime Compute for Apache Flink.

Background information

DataStream allows you to call the underlying API operations of Realtime Compute for Apache Flink so that you can use Realtime Compute for Apache Flink in a flexible manner. The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with Apache Flink. For more information, see Flink DataStream API Programming Guide.

Note

Table API drafts are developed in the same way as DataStream drafts.

Limits

To prevent the impact of factors such as deployment and network environments, take note of the following points:

  • Only DataStream JAR drafts can be published. A JAR draft can use only one main JAR file and multiple JAR dependencies.

  • Realtime Compute for Apache Flink cannot read local configurations by using the main() method in a JAR draft.

  • Java Development Kit (JDK) 1.8 is used in the runtime environment. Therefore, you must also use JDK 1.8 to develop drafts.

  • Only open source Scala V2.12 is supported.

Precautions

  • The parameters in the deployment code take precedence over the parameters that you configured in the Realtime Compute for Apache Flink console. To ensure that your deployment runs as expected, we recommend that you configure checkpoint-related parameters in the Realtime Compute for Apache Flink console instead of in the deployment code.

  • If the upstream and downstream storage systems that DataStream deployments of Realtime Compute for Apache Flink access use a whitelist mechanism, you must configure whitelists. For more information, see How do I configure a whitelist?

  • To prevent conflicts between JAR package dependencies, take note of the following points:

    • Make sure that the Realtime Compute for Apache Flink version that you select on the Draft Editor page is the same as that specified in the dependency library of Apache Flink in the POM file. For more information about how to view the Realtime Compute for Apache Flink version, see How do I query the engine version of Flink that is used by a deployment?

    • Specify <scope>provided</scope> for dependencies related to Realtime Compute for Apache Flink. Dependencies related to Realtime Compute for Apache Flink mainly include non-connector dependencies whose names start with flink- in the org.apache.flink group, such as flink-streaming-java, flink-streaming-java_2.11, flink-java, and flink-runtime.

      Note

      We recommend that you configure dependencies during draft development to avoid conflicts between dependencies related to Realtime Compute for Apache Flink. For more information about how to configure dependencies, see How do I troubleshoot dependency conflicts of Flink?

    • Use the Shade plug-in to package other third-party dependencies. For more information, see Use maven-shade-plugin to package files.

    • Call only methods that are explicitly marked with @Public or @PublicEvolving in the source code of Apache Flink. Alibaba Cloud Realtime Compute for Apache Flink only ensures that Realtime Compute for Apache Flink is compatible with these methods.

    • Use the built-in dependencies of Realtime Compute for Apache Flink if the DataStream APIs supported by the built-in connectors of Realtime Compute for Apache Flink are used.

Develop a draft

Note

You must develop drafts in your on-premises environment before you deploy the drafts for running in the Realtime Compute for Apache Flink console.

References

Read data from a file stored in OSS

The following table describes the methods that you can use to read data from a configuration file stored in an Object Storage Service (OSS) bucket.

Method

Description

Scenario

Procedure

Method 1

Run code in a JAR draft to read data from a configuration file that is stored on a local disk.

Only data from the OSS bucket that is associated with your workspace can be read.

  1. In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose Artifacts. On the Artifacts page, upload the file.

    When the deployment is running, the file that you upload on the Artifacts page is downloaded to the destination machine and loaded to the /flink/usrlib directory of the pod in which the JobManager and TaskManagers reside.

  2. Run the following code in the JAR draft to read data from the configuration file that is stored on the local disk.

    try (BufferedReader reader = new BufferedReader(new FileReader("/flink/usrlib/yourFile"))) {
        // read file and process ...
    }
  3. Create a deployment and upload a dependency file. For more information, see Create a deployment.

Method 2

Use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS.

Only data from the OSS bucket on which Realtime Compute for Apache Flink has the access permission can be read.

For more information about how to use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS, see Streaming download. The following sample code shows how to read data from a configuration file that is stored in OSS.

OSS ossClient = new OSSClientBuilder().build("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret");
try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/exampleobject.txt");
     BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
    // read file and process ...
} finally {
    if (ossClient != null) {
        ossClient.shutdown();
    }
}

Use a connector

If you want to read and write data in DataStream mode, you must use the related type of DataStream connector to connect to Realtime Compute for Apache Flink. You can use the Ververica Runtime (VVR) DataStream connectors that are stored in the Maven central repository to develop drafts. You can use a connector in one of the following ways:

Important

Interfaces and parameters may change in the future. We recommend that you use the connectors that are specified to provide DataStream APIs in Supported connectors.

(Recommended) Upload the uber JAR file of the connector to the console of fully managed Flink and call the JAR file by using provided in a DataStream draft

  1. Prepare the development environment for a DataStream draft.

    1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

      If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:

      • If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    3. Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.

      Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the POM file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
          <scope>provided</scope>
      </dependency>

      Add the public package flink-connector-base of the connectors together with the connectors as project dependencies:

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      In the preceding file, ${flink.version} indicates the Realtime Compute for Apache Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is 1.17-vvr-8.0.4-1, the Realtime Compute for Apache Flink version is 1.17.0.

      Important
      • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.

      • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. Develop a DataStream draft.

    For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:

  3. Package the program and publish a DataStream draft.

    Use Maven to package the program and upload the generated JAR file to the Realtime Compute for Apache Flink console. For more information, see Create a JAR deployment.

  4. Upload the uber JAR file of the connector to the Realtime Compute for Apache Flink console.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. In the upper-right corner of the Artifacts page, click Upload Artifact and select the uber JAR file of the connector that you want to use.

      You can upload the JAR file of the connector that you develop or the connector provided by Realtime Compute for Apache Flink. For more information about the download links of the official JAR files of the connectors provided by Realtime Compute for Apache Flink, see Connectors.

      Important

      The Maven repository directory is updated. The latest version of the JAR file of a connector, such as vvr-8.0.4-1-flink-1.17, may not be provided in the level-1 directory. You can directly enter the URL of the required dependencies in the address bar of your browser to download the dependencies. For example, you can enter a URL similar to https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-continuous-odps/1.17-vvr-8.0.4-1/ververica-connector-continuous-odps-1.17-vvr-8.0.4-1-uber.jar to download the required dependencies.

    5. In the Create Deployment dialog box, select the uber JAR file of the desired connector from the drop-down list of Additional Dependencies.

Package the connector as a project dependency into the JAR file of your deployment

  1. Prepare the development environment for a DataStream API draft.

    1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

      If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:

      • If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    3. Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.

      Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the POM file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>

      Add the public package flink-connector-base of the connectors together with the connectors as project dependencies:

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      In the preceding file, ${flink.version} indicates the Realtime Compute for Apache Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is 1.17-vvr-8.0.4-1, the Realtime Compute for Apache Flink version is 1.17.0.

      Important
      • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.

      • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. Develop a DataStream draft.

    For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:

  3. Package the program and publish a DataStream draft.

    Use Maven to package the program and upload the generated JAR file to the Realtime Compute for Apache Flink console. For more information, see Create a JAR deployment.

Use maven-shade-plugin to package files

Use maven-shade-plugin to generate the uber JAR file that contains all dependency JAR files. Sample code:

<project>
    <!-- ... Other configurations ... -->

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <!-- Change the path of the packaged classes -->
                        <relocations>
                            <relocation>
                                <pattern>com.example.oldpackage</pattern>
                                <shadedPattern>com.example.newpackage</shadedPattern>
                            </relocation>
                        </relocations>

                        <!-- Remove unnecessary dependencies -->
                        <artifactSet>
                            <excludes>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>org.apache.logging.log4j:*</exclude>
                            </excludes>
                        </artifactSet>

                        <!-- Filter and remove unnecessary files -->
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
 
                        <!-- Specify an executable JAR file by configuring the mainClass parameter -->
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.alibaba.ververica.connector.demo.flink.flinkDemoJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>

    <!-- ... Other configurations ... -->
</project>

For more information about how to use the Shade plug-in to package dependencies, see Apache Maven Shade Plugin.

Debug a draft

  • You can call the DataGeneratorSource operation to enter simulated data and debug a deployment. For more information about how to call the DataGeneratorSource operation, see DataGeneratorSourceTest.java. You can also create a SQL deployment of Realtime Compute for Apache Flink, use the Datagen connector to generate simulated data, and then write the simulated data to the data source that is read by a JAR deployment. Then, the JAR deployment consumes the data.

  • You can use a print result table to debug a deployment. For more information, see WindowWordCount.java. You must configure parameters to export logs of a deployment to an external storage in advance. For more information, see Configure parameters to export logs of a deployment.

References