All Products
Search
Document Center

Realtime Compute for Apache Flink:Encrypt and decrypt database passwords in Flink with Key Management Service

Last Updated:Oct 28, 2025

Realtime Compute for Apache Flink can integrate with Key Management Service (KMS) to encrypt and decrypt sensitive data (such as database passwords) configured for your Flink workloads, thereby ensuring data security. This topic describes how to encrypt and decrypt database passwords with KMS in a JAR deployment that reads data from an ApsaraDB RDS for MySQL database.

Background information

KMS is an all-in-one platform that supports simplified, reliable, secure, and compliant credential management and data encryption. KMS provides cryptographic API operations that enable you to encrypt and decrypt data in a simplified manner, which frees you from complicated and abstract cryptography. Furthermore, KMS offers automatic key rotation, which enhances data security and reduces the effort of key management. For more information, see Benefits in the KMS documentation.

In real-time computing scenarios, Flink often needs to connect to data sources (Kafka, MySQL, etc) to access sensitive data. The traditional practice of hardcoding sensitive data or storing it in configuration files can lead to significant security challenges. By integrating with KMS, Flink can retrieve encrypted information and decrypt it on demand to protect against plaintext credential exposure.

The solution's architecture is as follows:

image

Prerequisites

(Optional) Step 1: Make preparations

Prepare an ApsaraDB RDS for MySQL data source.

  1. Create a MySQL database and an account.

    Create a database (school) and a standard account (flink_rds_user) that has the read and write permissions on the database. For more information, see Step 1: Create an ApsaraDB RDS for MySQL instance and configure databases.

  2. Prepare an ApsaraDB RDS for MySQL data source.

    1. In the upper-right corner of the details page of the target instance, click Log On to Database.

    2. In the dialog box that appears, enter the database account and password. Then, click Login.

    3. After you successfully log on, double-click the school database in the left-side navigation pane.

    4. In the SQL editor, write DDL statements to create three business tables and insert data into them:

      CREATE TABLE `student` (
        id INT not null primary key,
        username VARCHAR(255),
        age BIGINT
      );
        
      INSERT INTO student VALUES
      (001, 'lily', 15),
      (002, 'leilei', 18),
      (003, 'xiaoming', 17),
      (004, 'huahua', 15);
      
      SELECT * FROM student;
  3. Click Execute(F8). On the panel that appears, click Execute.

Run a JAR deployment with unencrypted data

Ensure that the JAR deployment with unencrypted information can run properly.

  1. Develop a Flink program locally.

    1. Create a new project in IntelliJ IDEA.

    2. Copy and paste the following code snippets to the JavaDemo class file and POM.xml file. Remember to modify the values of the configuration options to match your specific setup.

      JavaDemo

      package org.example;
      
      import com.ververica.cdc.connectors.mysql.source.MySqlSource;
      import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
      import org.apache.flink.table.types.DataType;
      import org.apache.flink.table.types.logical.LogicalType;
      import org.apache.flink.table.types.logical.RowType;
      import org.apache.flink.table.types.utils.TypeConversions;
      
      
      public class JavaDemo {
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // Build a deserializer.
              DataType dataType =
                      DataTypes.ROW(
                              DataTypes.FIELD("id", DataTypes.INT()),
                              DataTypes.FIELD("username", DataTypes.STRING()),
                              DataTypes.FIELD("age", DataTypes.INT()));
      
              LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
              InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
              RowDataDebeziumDeserializeSchema deserializer =
                      RowDataDebeziumDeserializeSchema.newBuilder()
                              .setPhysicalRowType((RowType) dataType.getLogicalType())
                              .setResultTypeInfo(typeInfo)
                              .build();
      
              // Configure the data source (com.ververica.cdc.connectors.mysql.source.MySqlSource).
              MySqlSource<RowData> mySqlSource =
                      MySqlSource.<RowData>builder()
                              .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                              .port(3306)
                              .databaseList("school") // Specify the database.
                              .tableList("school.student") // Specify the table.
                              .username("flink_rds_user")
                              .password("flink_rds_password@123")
                              // Initialize data in the RowData structure.
                              .deserializer(deserializer)
                              .build();
      
              // Integrate the external data source into the Flink DataStream program
              // Do not use a watermark strategy.
              DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
      
              // Write to stdout.
              mySQLSource.print();
      
              // Execute the program.
              env.execute("MySQL CDC Test");
          }
      }
      

      POM.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
      
        <groupId>com.aliyun</groupId>
        <artifactId>JavaDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>Flink MySQL CDC Demo</name>
      
        <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <maven.compiler.source>1.8</maven.compiler.source>
          <maven.compiler.target>1.8</maven.compiler.target>
          <flink.version>1.17.1</flink.version>
          <flink-cdc.version>2.4.2</flink-cdc.version>
          <log4j.version>2.17.1</log4j.version>
        </properties>
      
        <dependencies>
          <! -- Flink core dependencies (Set the scope to provided when packaging the program) -->
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
      
          <!-- Realtime Compute for Apache Flink's MySQL CDC connector -->
          <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- Comment out the following line for local execution -->
            <!-- <scope>provided</scope> -->
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- The version should be consistent with the Flink version you specified earlier -->
            <scope>provided</scope>
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- The version should be consistent with the Flink version you specified earlier -->
            <scope>provided</scope>
          </dependency>
      
      
          <! -- Log dependencies -->
          <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
          </dependency>
          
        </dependencies>
      
        <build>
          <plugins>
            <! -- Compiler plug-in -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.13.0</version> <! -- Patch version -->
              <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <encoding>UTF-8</encoding>
              </configuration>
            </plugin>
      
            <! -- Plug-in for building a fat JAR -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-shade-plugin</artifactId>
              <version>3.5.1</version>
              <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                    <goal>shade</goal>
                  </goals>
                  <configuration>
                    <artifactSet>
                      <excludes>
                        <exclude>org.apache.flink:force-shading</exclude>
                        <exclude>com.google.code.findbugs:jsr305</exclude>
                        <! -- Retain logging dependencies -->
                        <!-- <exclude>org.slf4j:*</exclude> -->
                        <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                      </excludes>
                    </artifactSet>
                    <filters>
                      <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                          <exclude>META-INF/MANIFEST.MF</exclude> <! -- Add a key filter -->
                        </excludes>
                      </filter>
                    </filters>
                    <transformers>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.example.JavaDemo</mainClass>
                      </transformer>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- Add a necessary transformer -->
                    </transformers>
                  </configuration>
      
                </execution>
              </executions>
            </plugin>
          </plugins>
        </build>
      </project>
      
    3. Modify connection configurations in JavaDemo for the ApsaraDB RDS for MySQL instance.

      Option

      Description

      Example

      hostname

      The endpoint of the MySQL.

      rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com

      port

      The port of the MySQL instance.

      3306

      databaseList

      The MySQL database name.

      school

      tableList

      The MySQL table name.

      school.student

      username

      The account for accessing the MySQL database.

      flink_rds_user

      password

      The password for accessing the MySQL database.

      flink_rds_password@123

    4. Build a JAR.

      The JavaDemo-1.0-SNAPSHOT.jar file will appear under the project's target directory.

  2. Deploy the JAR, start the deployment, and check the data processing results in the console.

    1. In the development console, go to Artifacts and upload the JavaDemo-1.0-SNAPSHOT.jar file.

    2. Create a JAR deployment.

      1. In the left-side navigation pane, choose O&M > Deployments. In the upper-left corner of the Deployments page, choose Create Deployment > JAR Deployment.

      2. In the Create Deployment dialog box, configure the parameters of the deployment:

        Parameter

        Description

        Example

        Deployment Mode

        The mode that you want to use to deploy the JAR deployment. Select Stream Mode.

        Stream Mode

        Deployment Name

        Enter the name of the JAR deployment.

        javademo

        Engine Version

        The engine version that will be used by the deployment.

        We recommend that you use an engine version labelled RECOMMENDED or STABLE because they are more reliable and performant. For more information, see Release notes and Engine version.

        vvr-8.0.11-flink-1.17

        JAR URI

        Choose the uploaded JAR.

        Note

        In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, access to Object Storage Service (OSS) buckets is restricted to the bucket bound to the workspace at its creation.

        JavaDemo-1.0-SNAPSHOT.jar

        Entry Point Class

        The entry point class of the JAR application. If you do not specify a main class for the JAR, enter a standard directory in the Entry Point Class field.

        -

        Entry Point Main Arguments

        Enter the arguments you want to pass to the main method.

        -

        Deployment Target

        The destination in which the deployment is deployed. Select the desired queue or session cluster from the drop-down list. For more information, see Manage queues and the "Step 1: Create a session cluster" section of the Debug a deployment topic.

        Important

        Do not use session clusters for production. Session clusters do not support monitoring metrics, monitoring and alerting, or Autopilot. For more information, see Debug a draft.

        default-queue

        For more information, see Create a deployment.

      3. Click Deploy.

    3. On the Deployments page, find the javademo deployment, and click Start in the Actions column. In the Start Job panel, select Initial Mode, and click Start.

    4. Check the data processing results.

      After the deployment's state becomes RUNNING, go to the deployment details page. Select the Logs tab and the Running Task Managers subtab. Click an item in the Path, ID column, switch to the Log List subtab, and click the log with the .out suffix. Search lily to check the data processing results.

      image

Step 2: Encrypt a plaintext password with KMS

KMS encrypts the plaintext password flink_rds_password@123. The encrypted password is a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==.

To obtain the KMS encryption key, use any of the following methods:

Method 1: Call the KMS Encrypt operation on OpenAPI portal

  1. Go to Alibaba Cloud OpenAPI Portal. Choose the target region.

  2. Set KeyId and Plaintext.

  3. Click Initiate Call.

  4. View the ciphertext.

For more information, see Encrypt.

Method 2: Call the KMS Encrypt operation from IntelliJ IDEA

  1. Enable Internet access to KMS keys. For more information, see Access KMS instance keys over the Internet.

    KMS keys are only accessible through the VPC network by default.

  2. Configure the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables. For instructions on how to obtain the AccessKey pair, see How do I view the AccessKey pair of an account?.

  3. In the target IntelliJ IDEA project, create a class file named EncryptFlink.

  4. Copy and paste the following code snippet to the EncryptFlink class file. Remember to modify the values of the configuration options to match your specific setup.

    package org.example;
    
    import com.aliyun.kms20160120.models.EncryptResponse;
    import com.aliyun.kms20160120.models.EncryptResponseBody;
    import com.aliyun.tea.*;
    
    public class EncryptFlink {
    
        /**
         * <b>description</b> :
         * <p>Use your AccessKey pair to initialize the client.</p>
         * @return Client
         *
         * @throws Exception
         */
        public static com.aliyun.kms20160120.Client createClient() throws Exception {
            // If the project code is leaked, the AccessKey pair may be leaked, compromising the security of all resources within your account. The following sample code is provided only for reference. 
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is configured. 
                    .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                    // Required. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is configured. 
                    .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // Specify an endpoint. For more information, visit https://api.aliyun.com/product/Kms.
            config.endpoint = "kms.cn-hangzhou.aliyuncs.com";
            return new com.aliyun.kms20160120.Client(config);
        }
    
        public static void main(String[] args_) throws Exception {
            java.util.List<String> args = java.util.Arrays.asList(args_);
            com.aliyun.kms20160120.Client client = EncryptFlink.createClient();
            com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest()
                    .setPlaintext("flink_rds_password@123")
                    .setKeyId("key-hzz67ab1ff4e750h****");
            com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
            try {
                // Write your own code to display the response of the API operation if necessary.
                EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime);
                EncryptResponseBody body = encryptResponse.getBody();
                System.out.println(body.getCiphertextBlob());
            } catch (TeaException error) {
                // Handle exceptions with caution in actual business scenarios and do not ignore the exceptions in your project. In this example, error messages are simply printed for demonstration purposes. 
                // The error message.
                System.out.println(error.getMessage());
                // The URL for troubleshooting.
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            } catch (Exception _error) {
                TeaException error = new TeaException(_error.getMessage(), _error);
                // Handle exceptions with caution in actual business scenarios and do not ignore the exceptions in your project. In this example, error messages are simply printed for demonstration purposes. 
                // The error message.
                System.out.println(error.getMessage());
                // The URL for troubleshooting.
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            }
        }
    }

    Option

    Description

    Example

    config.endpoint

    The endpoint of your KMS instance.

    kms.cn-hangzhou.aliyuncs.com

    Plaintext

    The plaintext password to encrypt.

    flink_rds_password@123

    KeyId

    The KMS key ID.

    key-hzz67ab1ff4e750h****

  5. Add the following dependencies to the POM.xml file.

        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>kms20160120</artifactId>
          <version>1.2.3</version>
        </dependency>
    
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>tea</artifactId>
          <version>1.3.2</version>
        </dependency>
  6. Run the EncryptFlink class file to obtain the ciphertext.

Step 3: Add the KMS decryption code to your program

Procedure

  1. Create a decryption utility class file.

    1. In IntelliJ IDEA, create a class file named KmsUtil under the target project folder.

    2. Copy and paste the following code snippet to the KmsUtil class file. Remember to modify the values of the configuration options to match your specific setup.

      package org.example;
      
      import com.aliyun.kms20160120.Client;
      import com.aliyun.kms20160120.models.DecryptRequest;
      import com.aliyun.teaopenapi.models.Config;
      
      public class KmsUtil {
          public static String decrypt(String ak, String sk, String ciphertext) throws Exception {
              Client client = new Client(new Config()
                      .setAccessKeyId(ak)
                      .setAccessKeySecret(sk)
                      .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com")
                      .setCa("-----BEGIN CERTIFICATE-----\n" +
                              "MIIDuzCCAqOgAwIBAgIJA*****--\n"));
              return client.decryptWithOptions(
                      new DecryptRequest().setCiphertextBlob(ciphertext),
                      new com.aliyun.teautil.models.RuntimeOptions()
              ).getBody().getPlaintext();
          }
      }
      

      Option

      Description

      Example

      Endpoint

      The VPC endpoint of your KMS instance.

      kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com

      Ca

      The CA certificate.

      Download the KMS instance's CA certificate to your device in the KMS console. For more information, see Create access credentials.

      -----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n

  2. Modify the JavaDemo file.

    1. Write code to retrieve an AccessKey pair and decrypt encrypted data.

      Replace the value of encryptedPassword with the ciphertext you obtained in Step 2.

      // Parse parameters to get the AccessKey pair.
       final ParameterTool params = ParameterTool.fromArgs(args);
       String ak = params.get("akid");
       String sk = params.get("aksecret");
       
       // Decrypt the encrypted password.
       String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
       String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
    2. Change the plaintext password to the newly added variable.

      For example, .password("flink_rds_password@123") is changed to .password(decryptedPassword).

  3. Modify the pom.xml file

    1. Set mainClass to org.example.JavaDemo.

    2. Add KMS dependencies.

          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
          </dependency>
      
          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
          </dependency>
    3. (Optional) Change the value of artifactId to KmsJavaDemo.

      This will help you distinguish the two JARs.

  4. Build the JAR.

    The KmsJavaDemo-1.0-SNAPSHOT.jar file will appear under the target directory.

Complete code demo

KmsJavaDemo

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;


public class JavaDemo {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        // Parse parameters to get the AccessKey pair.
        final ParameterTool params = ParameterTool.fromArgs(args);
        String ak = params.get("akid");
        String sk = params.get("aksecret");
        
        // Decrypt the encrypted password.
        String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
        String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);

        // Build a deserializer.
        DataType dataType =
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("username", DataTypes.STRING()),
                        DataTypes.FIELD("age", DataTypes.INT()));

        LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
        RowDataDebeziumDeserializeSchema deserializer =
                RowDataDebeziumDeserializeSchema.newBuilder()
                        .setPhysicalRowType((RowType) dataType.getLogicalType())
                        .setResultTypeInfo(typeInfo)
                        .build();

        // Configure the data source (com.ververica.cdc.connectors.mysql.source.MySqlSource).
        MySqlSource<RowData> mySqlSource =
                MySqlSource.<RowData>builder()
                        .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                        .port(3306)
                        .databaseList("school") // Specify the database.
                        .tableList("school.student") // Specify the table.
                        .username("flink_rds_user")
                        .password(decryptedPassword)
                        // Initialize data in the RowData structure.
                        .deserializer(deserializer)
                        .build();

        // Integrate the external data source into the Flink DataStream program
        // Do not use a watermark strategy.
        DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");

        // Write to stdout.
        mySQLSource.print();

        // Execute the program.
        env.execute("MySQL CDC Test");
    }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aliyun</groupId>
    <artifactId>KmsJavaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Flink MySQL CDC Demo</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink-cdc.version>2.4.2</flink-cdc.version>
        <log4j.version>2.17.1</log4j.version>
    </properties>
    <dependencies>
        <! -- Flink core dependencies (Set the scope to provided when packaging the program) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Realtime Compute for Apache Flink's MySQL CDC connector -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- Comment out the following line for local execution -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- The version should be consistent with the Flink version you specified earlier -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- The version should be consistent with the Flink version you specified earlier -->
            <scope>provided</scope>
        </dependency>


        <! -- Log dependencies -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <! -- Compiler plug-in -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version> <! -- Patch version -->
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <! -- Plug-in for building a fat JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <! -- Retain logging dependencies -->
                                    <!-- <exclude>org.slf4j:*</exclude> -->
                                    <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/MANIFEST.MF</exclude> <! -- Add a key filter -->
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.JavaDemo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- Add a necessary transformer -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Step 4: Deploy the new JAR and start the deployment

  1. Upload the new JAR.

    1. Log on to the management console of the Realtime Compute for Apache Flink.

    2. Find the target workspace and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. Click Upload Artifact and select the JAR to upload.

      KmsJavaDemo-1.0-SNAPSHOT.jar built in step 3 is uploaded in this example.

  2. Create a JAR deployment.

    1. In the left-side navigation pane, choose O&M. In the upper-left corner of the Deployments page, choose Create Deployment > JAR Deployment.

    2. In the Create Deployment dialog box, configure the parameters of the deployment:

      Parameter

      Description

      Example

      Deployment Mode

      The mode that you want to use to deploy the JAR deployment. Select Stream Mode.

      Stream Mode

      Deployment Name

      Enter the name of the JAR deployment.

      kmsjavademo

      Engine Version

      The engine version that will be used by the deployment.

      We recommend that you use an engine version labelled RECOMMENDED or STABLE because they are more reliable and performant. For more information, see Release notes and Engine version.

      vvr-8.0.11-flink-1.17

      JAR URI

      Choose the uploaded JAR.

      Note

      In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, access to Object Storage Service (OSS) buckets is restricted to the bucket bound to the workspace at its creation.

      KmsJavaDemo-1.0-SNAPSHOT.jar

      Entry Point Class

      The entry point class of the JAR application. If you do not specify a main class for the JAR, enter a standard directory in the Entry Point Class field.

      -

      Entry Point Main Arguments

      Enter the arguments you want to pass to the main method.

      To protect your AccessKey pairs, we recommend that you configure the AccessKey pair by using variables. For more information, see Manage variables. In this example, akid and aksecret are variable names.

      For instructions on how to obtain the AccessKey pair, see How do I view the AccessKey pair of an account?.

      --akid ${secret_values.akid} --aksecret ${secret_values.aksecret}

      Deployment Target

      The destination in which the deployment is deployed. Select the desired queue or session cluster from the drop-down list. For more information, see Manage queues and the "Step 1: Create a session cluster" section of the Debug a deployment topic.

      Important

      Do not use session clusters for production. Session clusters do not support monitoring metrics, monitoring and alerting, or Autopilot. For more information, see Debug a draft.

      default-queue

      For more information, see Create a deployment.

    3. Click Deploy.

  3. Start a deployment.

    On the Deployments page, find the kmsjavademo deployment, and click Start in the Actions column. In the Start Job panel, select Initial Mode, and click Start.

Step 5: View the results in the TaskManager logs

After the deployment's state becomes RUNNING, go to the deployment details page. Select the Logs tab and the Running Task Managers subtab. Click an item in the Path, ID column, switch to the Log List subtab, and click the log with the .out suffix. Search for lily and check the data processing results.

image

References