All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a JAR draft

Last Updated:Feb 27, 2024

Flink JAR drafts provide a more flexible programming model and APIs. The programming model and APIs allow you to specify data conversions, perform operations, and configure operators to meet requirements for complex business logic and data processing. This topic describes how to develop and debug a Flink JAR draft.

Background information

Flink DataStream allows you to call the underlying API operations of fully managed Flink so that you can use fully managed Flink in a flexible manner. The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with Apache Flink. For more information, see Flink DataStream API Programming Guide.

Note

Table API drafts are developed in the same way as DataStream drafts.

Limits

To prevent the impact of factors such as deployment and network environments, take note of the following points:

  • Only DataStream JAR drafts can be published. A JAR draft can use only one main JAR file and multiple JAR dependencies.

  • Fully managed Flink cannot read local configurations by using the main() method in a JAR draft.

  • Java Development Kit (JDK) 1.8 is used in the runtime environment. Therefore, you must also use JDK 1.8 to develop drafts.

  • Only open source Scala V2.12 is supported.

Precautions

  • The parameters in the deployment code take precedence over the parameters that you configured in the console of fully managed Flink. To ensure that your deployment runs as expected, we recommend that you configure checkpoint-related parameters in the console of fully managed Flink instead of in the deployment code.

  • If the upstream and downstream storage systems that Flink DataStream deployments access use a whitelist mechanism, you must configure whitelists. For more information, see How do I configure a whitelist?

  • To prevent conflicts between JAR file dependencies, take note of the following points:

    • Make sure that the Flink version that you select on the SQL Editor page is the same as the Flink version that is specified in the dependency library of Apache Flink in the POM file. For more information about how to view the Flink version, see How do I query the engine version of Flink that is used by a deployment?

    • Specify <scope>provided</scope> for Flink-related dependencies. Flink-related dependencies mainly include non-connector dependencies whose names start with flink- in the org.apache.flink group, such as flink-streaming-java, flink-streaming-java_2.11, flink-java, and flink-runtime.

      Note

      We recommend that you configure dependencies during draft development to avoid Flink dependency conflicts. For more information about how to configure dependencies, see How do I troubleshoot dependency conflicts of Flink?

    • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

    • Call only methods that are explicitly marked with @Public or @PublicEvolving in the source code of Apache Flink. Alibaba Cloud Realtime Compute for Apache Flink only ensures that Realtime Compute for Apache Flink is compatible with these methods.

Develop a draft

Note

You must develop drafts in your on-premises environment before you deploy the drafts for running in the console of fully managed Flink.

References

Read data from a file stored in OSS

The following table describes the methods that you can use to read data from a configuration file stored in Object Storage Service (OSS).

Method

Description

Procedure

Method 1

Run code in a JAR draft to read data from a configuration file that is stored on a local disk.

  1. In the left-side navigation pane, choose Artifacts. On the Artifacts page, click Upload Artifact to upload the file.

    The file that you upload is stored in the oss://ossBucketName/artifacts/namespaces/namespaceName/<file> directory and can be downloaded to the destination machine. When the deployment is running, the file is loaded to the /flink/usrlib directory of the pod in which the JobManager and TaskManager reside.

  2. Run the following code in the JAR draft to read data from the configuration file that is stored on the local disk.

    try (BufferedReader reader = new BufferedReader(new FileReader("/flink/usrlib/yourFile"))) {
        // read file and process ...
    }
  3. Create a deployment and upload a dependency file. For more information, see Create a deployment.

Method 2

Use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS.

For more information about how to use OSSClient in a JAR draft to read data from a configuration file that is stored in OSS, see Streaming download. The following sample code shows how to read data from a configuration file that is stored in OSS.

OSS ossClient = new OSSClientBuilder().build("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret");
try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/exampleobject.txt");
     BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
    // read file and process ...
} finally {
    if (ossClient != null) {
        ossClient.shutdown();
    }
}

Use a connector

If you want to read and write data in DataStream mode, you must use the related type of DataStream connector to connect to fully managed Flink. You can use the Ververica Runtime (VVR) DataStream connectors that are stored in the Maven central repository to develop drafts. You can use a connector in one of the following ways:

Important

Interfaces and parameters may change in the future. We recommend that you use the connectors that are specified to provide DataStream APIs in Supported connectors.

(Recommended) Upload the uber JAR file of the connector to the console of fully managed Flink and call the JAR file by using provided in a DataStream draft

  1. Prepare the development environment for a DataStream draft.

    1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

      If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:

      • If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    3. Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.

      Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the POM file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
          <scope>provided</scope>
      </dependency>

      Add the public package flink-connector-base of the connectors together with the connectors as project dependencies to the code:

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      In the preceding file, ${flink.version} indicates the Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is 1.17-vvr-8.0.4-1, the Flink version is 1.17.0.

      Important
      • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.

      • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. Develop a DataStream draft.

    For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:

  3. Package the program and publish a DataStream draft.

    Use Maven to package the program and upload the generated JAR file to the console of fully managed Flink. For more information, see Create a JAR deployment.

  4. Upload the uber JAR file of the connector to the console of fully managed Flink.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. In the upper-right corner of the Artifacts page, click Upload Artifact and select the uber JAR file of the connector that you want to use.

      You can upload the JAR file of the connector that you develop or the connector provided by fully managed Flink. For more information about the download links of the official JAR files of the connectors provided by fully managed Flink, see Connectors.

      Important

      The Maven repository directory is updated. The latest version of the JAR file of a connector, such as Flink 1.17-vvr-8.0.4-1, may not be provided in the level-1 directory. You can directly enter the URL of the required dependencies in the address bar of your browser to download the dependencies. For example, you can enter a URL similar to https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-continuous-odps/1.17-vvr-8.0.4-1/ververica-connector-continuous-odps-1.17-vvr-8.0.4-1-uber.jar to download the required dependencies.

    5. In the Create Deployment dialog box, select the uber JAR file of the desired connector from the drop-down list of Additional Dependencies.

Package the connector as a project dependency into the JAR file of your deployment

  1. Prepare the development environment for a DataStream API draft.

    1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

      If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, the current mirror contains all repositories and the Maven project does not download dependencies from the preceding two specified SNAPSHOT repositories. As a result, the Maven project cannot download SNAPSHOT dependencies from these repositories. To prevent the preceding issue, perform the following operations based on the actual scenario:

      • If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

      • If the <mirrorOf>external:http:*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    3. Add the required connectors and public packages as project dependencies to the POM file of the Maven project for your draft.

      Different connector versions may correspond to different connector types. We recommend that you use the latest version of the specified type of connector that you use. For the complete dependency information, see the pom.xml file in the example of MaxCompute-Demo, DataHub-Demo, Kafka-Demo, or RocketMQ-Demo. The following example shows the project dependency code for a MaxCompute incremental source table.

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>

      Add the public package flink-connector-base of the connectors together with the connectors as project dependencies:

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      In the preceding file, ${flink.version} indicates the Flink version that corresponds to the runtime environment of the draft. If the engine version of your draft is 1.17-vvr-8.0.4-1, the Flink version is 1.17.0.

      Important
      • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.

      • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  2. Develop a DataStream draft.

    For more information about the configuration information and sample code of DataStream connectors, see the following topics in the DataStream connector documentation:

  3. Package the program and publish a DataStream draft.

    Use Maven to package the program and upload the generated JAR file to the console of fully managed Flink. For more information, see Create a JAR deployment.

Debug a draft

  • You can call the DataGeneratorSource operation to enter simulated data and debug a deployment. For more information about how to call the DataGeneratorSource operation, see DataGeneratorSourceTest.java. You can also create a Flink SQL deployment, use the Datagen connector to generate simulated data, and then write the simulated data to the data source that is read by a JAR deployment. Then, the JAR deployment consumes the data.

  • You can use a print result table to debug a deployment. For more information, see WindowWordCount.java. You must configure parameters to export logs of a deployment to an external storage in advance. For more information, see Configure parameters to export logs of a deployment.

References