All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a JAR draft

Last Updated:Oct 12, 2023

This topic describes the limits on developing DataStream API drafts and the methods that are used to develop DataStream API drafts in fully managed Flink.

Background information

Flink DataStream allows you to call the underlying API operations of fully managed Flink so that you can use fully managed Flink in a flexible manner. The DataStream API supported by Realtime Compute for Apache Flink is fully compatible with Apache Flink. For more information, see Flink DataStream API Programming Guide.

Note

Table API drafts are developed in the same way as DataStream drafts.

Limits

Services that are provided by fully managed Flink are subject to deployment and network environments. Therefore, when you develop DataStream API drafts in fully managed Flink, take note of the following points:

  • You can publish drafts and run deployments only in the JAR format.

  • You can use one main JAR file and multiple JAR dependencies in a JAR draft.

  • Fully managed Flink cannot read local configurations by using the main() method in a JAR draft.

  • The parameters in the deployment code take precedence over the parameters that you configured in the console of fully managed Flink. To ensure that your deployment runs as expected, we recommend that you configure checkpoint-related parameters in the console of fully managed Flink instead of in the deployment code.

  • Java Development Kit (JDK) 1.8 is used in the runtime environment. Therefore, you must also use JDK 1.8 to develop drafts.

  • Only open source Scala 2.11 is supported.

Precautions

  • If the upstream and downstream storage systems that Flink DataStream deployments access use a whitelist mechanism, you must configure whitelists. For more information, see How do I configure a whitelist?

  • To prevent conflicts between JAR file dependencies, take note of the following points:

    • Make sure that the Flink version that you select in the Create Deployment dialog box is the same as the Flink version that is specified in the dependency library of Apache Flink in the POM file. For more information about how to view the Flink version, see How do I query the engine version of Flink that is used by a deployment?

    • Specify <scope>provided</scope> for Flink-related dependencies. Flink-related dependencies mainly include non-connector dependencies whose names start with flink- in the org.apache.flink group, such as flink-streaming-java, flink-streaming-java_2.11, flink-java, and flink-runtime.

      Note

      We recommend that you configure dependencies during draft development to avoid Flink dependency conflicts. For more information about how to configure dependencies, see How do I troubleshoot dependency conflicts of Flink?

    • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

    • Call only methods that are explicitly marked with @Public or @PublicEvolving in the source code of Apache Flink. Alibaba Cloud only ensures that Realtime Compute for Apache Flink is compatible with these methods.

Develop a draft

Before you publish drafts to clusters for running in the console of fully managed Flink, develop the drafts in your on-premises environment. When you write business code in fully managed Flink, see the following references:

  • For more information about the configuration of the source code of Apache Flink deployments, see Project Configuration.

  • For more information about Flink DataStream connector dependencies, see Connector dependencies.

  • For more information about how to use connectors, see Use a connector.

  • For more information about the description, architecture, applications, and features of Apache Flink, see Introduction to Apache Flink.

  • For more information about how to develop business code in Apache Flink 1.10, see Flink DataStream API Programming Guide and Flink Table API and SQL.

  • For more information about the coding guide, Java and Scala language guides, and component and formatting guides, see Apache Flink Code Style and Quality Guide.

  • Issues may occur when you develop code in Apache Flink. For more information about the issues and fixes, see FAQ.

  • The following table describes the methods that you can use to read a configuration file stored in Object Storage Service (OSS).

    Method

    Description

    Procedure

    Method 1

    Run code in a JAR deployment to read a configuration file that is stored on a local disk.

    1. In the left-side navigation pane, choose Artifacts. On the Artifacts page, click Upload Artifact to upload the file.

      The file that you upload is stored in the oss://ossBucketName/artifacts/namespaces/namespaceName/<file> directory and can be downloaded to the destination machine. When the deployment is running, the file is loaded to the /flink/usrlib directory of the pod in which the JobManager and TaskManager reside.

    2. Run the following code in the JAR deployment to read the configuration file that is stored on the local disk.

      try (BufferedReader reader = new BufferedReader(new FileReader("/flink/usrlib/yourFile"))) {
          // read file and process ...
      }
    3. Create a deployment and upload a dependency file. For more information, see Create a deployment.

    Method 2

    Use OSSClient in a JAR deployment to read a configuration file that is stored in OSS.

    For more information about how to use OSSClient in a JAR deployment to read a configuration file that is stored in OSS, see Streaming download. The following sample code shows how to read a configuration file that is stored in OSS.

    OSS ossClient = new OSSClientBuilder().build("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret");
    try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/exampleobject.txt");
         BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
        // read file and process ...
    } finally {
        if (ossClient != null) {
            ossClient.shutdown();
        }
    }

Debug a deployment

  • You can call the DataGeneratorSource operation to enter simulated data and debug a deployment. For more information about how to call the DataGeneratorSource operation, see DataGeneratorSourceTest.java. You can also create a Flink SQL deployment, use the Datagen connector to generate simulated data, and then write the simulated data to the data source that is read by a JAR deployment. Then, the JAR deployment consumes the data.

  • You can use a print result table to debug a deployment. For more information, see WindowWordCount.java. You must configure parameters to export logs of a deployment to an external storage in advance. For more information, see Configure parameters to export logs of a deployment.

Use a connector

The Ververica Runtime (VVR) connectors are placed in the Maven central repository for you to use when you develop a draft. You can use a connector in one of the following ways:

Note

The central repository does not provide the ApsaraMQ for RocketMQ connector. You can directly download one of the following versions of the JAR files of the ApsaraMQ for RocketMQ connector based on your business requirements:

  • (Recommended) Package the connector as a project dependency into the JAR file of your deployment.

    1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:

      <repositories>
        <repository>
          <id>oss.sonatype.org-snapshot</id>
          <name>OSS Sonatype Snapshot Repository</name>
          <url>http://oss.sonatype.org/content/repositories/snapshots</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
        <repository>
          <id>apache.snapshots</id>
          <name>Apache Development Snapshot Repository</name>
          <url>https://repository.apache.org/content/repositories/snapshots/</url>
          <releases>
            <enabled>false</enabled>
          </releases>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
      </repositories>
    2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

      If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>. This change prevents the two SNAPSHOT repositories that you configured in Step 1 from being overwritten. If only an asterisk (*) is enclosed in the mirrorOf element, the SNAPSHOT repositories are overwritten.

    3. Add the connector that you want to use to the Maven POM file as a project dependency. The following example shows the sample code:

      <dependencies>
          <dependency>
              <groupId>com.alibaba.ververica</groupId>
              <artifactId>${connector.type}</artifactId>
              <version>${connector.version}</version>
          </dependency>
      </dependencies>

      Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use.

      Important
      • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.

      • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:

        <transformers>
            <!-- The service transformer is needed to merge META-INF/services files -->
            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                <projectName>Apache Flink</projectName>
                <encoding>UTF-8</encoding>
            </transformer>
        </transformers>
  • After you upload the JAR file of the connector to the console of fully managed Flink, enter the configuration information.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. Click Upload Artifact and select the JAR file that you want to upload.

      You can upload the JAR file of your self-managed connector or the JAR file of a connector that is provided by fully managed Flink.

    5. In the Create Deployment dialog box, select the JAR file of the desired connector from the drop-down list of Additional Dependencies.

References

Getting started for a Flink JAR deployment