全部产品
Search
文档中心

Realtime Compute for Apache Flink:Enkripsi dan Dekripsi Kata Sandi Database di Flink dengan Key Management Service

更新时间:Oct 29, 2025

Realtime Compute for Apache Flink dapat berintegrasi dengan Layanan Manajemen Kunci (KMS) untuk mengenkripsi dan mendekripsi data sensitif, seperti kata sandi basis data, yang dikonfigurasi untuk beban kerja Flink Anda, sehingga memastikan keamanan data. Topik ini menjelaskan cara mengenkripsi dan mendekripsi kata sandi basis data dengan KMS dalam penyebaran JAR yang membaca data dari instance ApsaraDB RDS untuk basis data MySQL.

Informasi latar belakang

KMS adalah platform all-in-one yang mendukung manajemen kredensial yang disederhanakan, andal, aman, dan sesuai. KMS menyediakan API operasi kriptografi yang memungkinkan Anda mengenkripsi dan mendekripsi data secara sederhana, menghilangkan kompleksitas kriptografi yang rumit. Selain itu, KMS menawarkan rotasi kunci otomatis, meningkatkan keamanan data dan mengurangi upaya manajemen kunci. Untuk informasi lebih lanjut, lihat Manfaat dalam dokumentasi KMS.

Dalam skenario komputasi waktu nyata, Flink sering kali perlu terhubung ke sumber data (Kafka, MySQL, dll.) untuk mengakses data sensitif. Praktik tradisional melakukan hardcoding data sensitif atau menyimpannya dalam file konfigurasi dapat menyebabkan tantangan keamanan yang signifikan. Dengan berintegrasi dengan KMS, Flink dapat mengambil informasi terenkripsi dan mendekripsinya sesuai permintaan untuk melindungi dari paparan kredensial teks biasa.

Arsitektur solusi tersebut adalah sebagai berikut:

Prasyarat

(Opsional) Langkah 1: Persiapan

Persiapkan sumber data ApsaraDB RDS untuk MySQL.

  1. Buat basis data MySQL dan akun.

    Buat basis data (school) dan akun standar (flink_rds_user) yang memiliki izin baca dan tulis pada basis data. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instance ApsaraDB RDS untuk MySQL dan konfigurasikan basis data.

  2. Persiapkan sumber data ApsaraDB RDS untuk MySQL.

    1. Di pojok kanan atas halaman detail instance target, klik Log On to Database.

    2. Di kotak dialog yang muncul, masukkan akun basis data dan kata sandi. Lalu, klik Login.

    3. Setelah berhasil masuk, klik dua kali basis data school di panel navigasi sebelah kiri.

    4. Di editor SQL, tulis pernyataan DDL untuk membuat tiga tabel bisnis dan masukkan data ke dalamnya:

      CREATE TABLE `student` (
        id INT not null primary key,
        username VARCHAR(255),
        age BIGINT
      );
        
      INSERT INTO student VALUES
      (001, 'lily', 15),
      (002, 'leilei', 18),
      (003, 'xiaoming', 17),
      (004, 'huahua', 15);
      
      SELECT * FROM student;
  3. Klik Execute(F8). Pada panel yang muncul, klik Execute.

Jalankan penyebaran JAR dengan data tidak terenkripsi

Pastikan bahwa penyebaran JAR dengan informasi tidak terenkripsi dapat berjalan dengan baik.

  1. Kembangkan program Flink secara lokal.

    1. Buat proyek baru di IntelliJ IDEA.

    2. Salin dan tempel potongan kode berikut ke file kelas JavaDemo dan file POM.xml. Ingatlah untuk memodifikasi nilai opsi konfigurasi agar sesuai dengan pengaturan spesifik Anda.

      JavaDemo

      package org.example;
      
      import com.ververica.cdc.connectors.mysql.source.MySqlSource;
      import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
      import org.apache.flink.table.types.DataType;
      import org.apache.flink.table.types.logical.LogicalType;
      import org.apache.flink.table.types.logical.RowType;
      import org.apache.flink.table.types.utils.TypeConversions;
      
      
      public class JavaDemo {
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // Bangun deserializer.
              DataType dataType =
                      DataTypes.ROW(
                              DataTypes.FIELD("id", DataTypes.INT()),
                              DataTypes.FIELD("username", DataTypes.STRING()),
                              DataTypes.FIELD("age", DataTypes.INT()));
      
              LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
              InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
              RowDataDebeziumDeserializeSchema deserializer =
                      RowDataDebeziumDeserializeSchema.newBuilder()
                              .setPhysicalRowType((RowType) dataType.getLogicalType())
                              .setResultTypeInfo(typeInfo)
                              .build();
      
              // Konfigurasikan sumber data (com.ververica.cdc.connectors.mysql.source.MySqlSource).
              MySqlSource<RowData> mySqlSource =
                      MySqlSource.<RowData>builder()
                              .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                              .port(3306)
                              .databaseList("school") // Tentukan basis data.
                              .tableList("school.student") // Tentukan tabel.
                              .username("flink_rds_user")
                              .password("flink_rds_password@123")
                              // Inisialisasi data dalam struktur RowData.
                              .deserializer(deserializer)
                              .build();
      
              // Integrasikan sumber data eksternal ke dalam program Flink DataStream
              // Jangan gunakan strategi watermark.
              DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Sumber MySQL");
      
              // Tulis ke stdout.
              mySQLSource.print();
      
              // Eksekusi program.
              env.execute("Uji CDC MySQL");
          }
      }
      

      POM.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
      
        <groupId>com.aliyun</groupId>
        <artifactId>JavaDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>Demo Flink MySQL CDC</name>
      
        <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <maven.compiler.source>1.8</maven.compiler.source>
          <maven.compiler.target>1.8</maven.compiler.target>
          <flink.version>1.17.1</flink.version>
          <flink-cdc.version>2.4.2</flink-cdc.version>
          <log4j.version>2.17.1</log4j.version>
        </properties>
      
        <dependencies>
          <! -- Dependensi inti Flink (Atur cakupan menjadi provided saat mengemas program) -->
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
      
          <!-- Konektor MySQL CDC Realtime Compute for Apache Flink -->
          <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- Komentari baris berikut untuk eksekusi lokal -->
            <!-- <scope>provided</scope> -->
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- Versi harus sesuai dengan versi Flink yang Anda tentukan sebelumnya -->
            <scope>provided</scope>
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- Versi harus sesuai dengan versi Flink yang Anda tentukan sebelumnya -->
            <scope>provided</scope>
          </dependency>
      
      
          <! -- Dependensi log -->
          <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
          </dependency>
          
        </dependencies>
      
        <build>
          <plugins>
            <! -- Plugin kompiler -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.13.0</version> <! -- Versi patch -->
              <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <encoding>UTF-8</encoding>
              </configuration>
            </plugin>
      
            <! -- Plugin untuk membangun fat JAR -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-shade-plugin</artifactId>
              <version>3.5.1</version>
              <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                    <goal>shade</goal>
                  </goals>
                  <configuration>
                    <artifactSet>
                      <excludes>
                        <exclude>org.apache.flink:force-shading</exclude>
                        <exclude>com.google.code.findbugs:jsr305</exclude>
                        <! -- Pertahankan dependensi log -->
                        <!-- <exclude>org.slf4j:*</exclude> -->
                        <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                      </excludes>
                    </artifactSet>
                    <filters>
                      <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                          <exclude>META-INF/MANIFEST.MF</exclude> <! -- Tambahkan filter kunci -->
                        </excludes>
                      </filter>
                    </filters>
                    <transformers>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.example.JavaDemo</mainClass>
                      </transformer>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- Tambahkan transformer yang diperlukan -->
                    </transformers>
                  </configuration>
      
                </execution>
              </executions>
            </plugin>
          </plugins>
        </build>
      </project>
      
    3. Modifikasi konfigurasi koneksi di JavaDemo untuk instance ApsaraDB RDS untuk MySQL.

      Opsi

      Deskripsi

      Contoh

      hostname

      Titik akhir MySQL.

      rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com

      port

      Port instance MySQL.

      3306

      databaseList

      Nama basis data MySQL.

      school

      tableList

      Nama tabel MySQL.

      school.student

      username

      Akun untuk mengakses basis data MySQL.

      flink_rds_user

      password

      Kata sandi untuk mengakses basis data MySQL.

      flink_rds_password@123

    4. Bangun JAR.

      File JavaDemo-1.0-SNAPSHOT.jar akan muncul di direktori target proyek.

  2. Sebarkan JAR, mulai penyebaran, dan periksa hasil pemrosesan data di konsol.

    1. Di konsol pengembangan, buka Artifacts dan unggah file JavaDemo-1.0-SNAPSHOT.jar.

    2. Buat penyebaran JAR.

      1. Di panel navigasi sebelah kiri, pilih O&M > Deployments. Di pojok kiri atas halaman Deployments, pilih Create Deployment > JAR Deployment.

      2. Di kotak dialog Create Deployment, konfigurasikan parameter penyebaran:

        Parameter

        Deskripsi

        Contoh

        Deployment Mode

        Mode yang ingin Anda gunakan untuk menyebar JAR. Pilih Mode Aliran.

        Mode Aliran

        Deployment Name

        Masukkan nama penyebaran JAR.

        javademo

        Engine Version

        Versi mesin yang akan digunakan oleh penyebaran.

        Kami merekomendasikan Anda menggunakan versi mesin berlabel RECOMMENDED atau STABLE karena lebih andal dan performanya lebih baik. Untuk informasi lebih lanjut, lihat Catatan rilis dan Versi mesin.

        vvr-8.0.11-flink-1.17

        JAR URI

        Pilih JAR yang diunggah.

        Catatan

        Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau yang lebih baru, akses ke bucket Object Storage Service (OSS) dibatasi pada bucket yang terikat ke ruang kerja saat pembuatannya.

        JavaDemo-1.0-SNAPSHOT.jar

        Entry Point Class

        Kelas titik masuk aplikasi JAR. Jika Anda tidak menentukan kelas utama untuk JAR, masukkan direktori standar di bidang Entry Point Class.

        -

        Entry Point Main Arguments

        Masukkan argumen yang ingin Anda lewatkan ke metode utama.

        -

        Deployment Target

        Destinasi di mana penyebaran dilakukan. Pilih queue atau session cluster yang diinginkan dari daftar drop-down. Untuk informasi lebih lanjut, lihat Kelola antrian dan bagian "Langkah 1: Buat klaster sesi" dari topik Debug penyebaran.

        Penting

        Jangan gunakan klaster sesi untuk produksi. Klaster sesi tidak mendukung metrik pemantauan, pemantauan dan peringatan, atau Autopilot. Untuk informasi lebih lanjut, lihat Debug draf.

        antrian-default

        Untuk informasi lebih lanjut, lihat Buat penyebaran.

      3. Klik Deploy.

    3. Di halaman Deployments, cari deployment javademo, lalu klik Start di kolom Actions. Pada panel Start Job, pilih Initial Mode, kemudian klik Start.

    4. Periksa hasil pemrosesan data.

      Setelah status penyebaran menjadi RUNNING, buka halaman detail penyebaran. Pilih tab Logs dan subtab Running Task Managers. Klik item di kolom Path, ID, beralih ke subtab Log List, dan klik log dengan akhiran .out. Cari lily untuk memeriksa hasil pemrosesan data.

      image

Langkah 2: Enkripsi kata sandi teks biasa dengan KMS

KMS mengenkripsi kata sandi teks biasa flink_rds_password@123. Kata sandi terenkripsi adalah a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==.

Untuk mendapatkan kunci enkripsi KMS, gunakan salah satu metode berikut:

Metode 1: Panggil operasi Enkripsi KMS di portal OpenAPI

  1. Pergi ke Portal OpenAPI Alibaba Cloud. Pilih wilayah target.

  2. Tetapkan KeyId dan Plaintext.

  3. Klik Initiate Call.

  4. Lihat ciphertext.

Untuk informasi lebih lanjut, lihat Enkripsi.

Metode 2: Panggil operasi Enkripsi KMS dari IntelliJ IDEA

  1. Aktifkan akses Internet ke kunci KMS. Untuk informasi lebih lanjut, lihat Akses kunci instance KMS melalui Internet.

    Kunci KMS hanya dapat diakses melalui jaringan VPC secara default.

  2. Konfigurasikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET. Untuk instruksi tentang cara mendapatkan pasangan AccessKey, lihat Bagaimana Cara Melihat Pasangan AccessKey dari Sebuah Akun?.

  3. Di proyek IntelliJ IDEA target, buat file kelas bernama EncryptFlink.

  4. Salin dan tempel potongan kode berikut ke file kelas EncryptFlink. Ingatlah untuk memodifikasi nilai opsi konfigurasi agar sesuai dengan pengaturan spesifik Anda.

    package org.example;
    
    import com.aliyun.kms20160120.models.EncryptResponse;
    import com.aliyun.kms20160120.models.EncryptResponseBody;
    import com.aliyun.tea.*;
    
    public class EncryptFlink {
    
        /**
         *  :
         * <p>Gunakan pasangan AccessKey Anda untuk menginisialisasi klien.</p>
         * @return Client
         *
         * @throws Exception
         */
        public static com.aliyun.kms20160120.Client createClient() throws Exception {
            // Jika kode proyek bocor, pasangan AccessKey mungkin bocor, membahayakan keamanan semua sumber daya dalam akun Anda. Contoh kode berikut hanya disediakan sebagai referensi. 
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    // Diperlukan. Pastikan bahwa variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dikonfigurasi. 
                    .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                    // Diperlukan. Pastikan bahwa variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. 
                    .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // Tentukan titik akhir. Untuk informasi lebih lanjut, kunjungi https://api.aliyun.com/product/Kms.
            config.endpoint = "kms.cn-hangzhou.aliyuncs.com";
            return new com.aliyun.kms20160120.Client(config);
        }
    
        public static void main(String[] args_) throws Exception {
            java.util.List<String> args = java.util.Arrays.asList(args_);
            com.aliyun.kms20160120.Client client = EncryptFlink.createClient();
            com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest()
                    .setPlaintext("flink_rds_password@123")
                    .setKeyId("key-hzz67ab1ff4e750h****");
            com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
            try {
                // Tulis kode Anda sendiri untuk menampilkan respons operasi API jika diperlukan.
                EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime);
                EncryptResponseBody body = encryptResponse.getBody();
                System.out.println(body.getCiphertextBlob());
            } catch (TeaException error) {
                // Tangani pengecualian dengan hati-hati dalam skenario bisnis aktual dan jangan abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan hanya dicetak untuk tujuan demonstrasi. 
                // Pesan kesalahan.
                System.out.println(error.getMessage());
                // URL untuk pemecahan masalah.
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            } catch (Exception _error) {
                TeaException error = new TeaException(_error.getMessage(), _error);
                // Tangani pengecualian dengan hati-hati dalam skenario bisnis aktual dan jangan abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan hanya dicetak untuk tujuan demonstrasi. 
                // Pesan kesalahan.
                System.out.println(error.getMessage());
                // URL untuk pemecahan masalah.
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            }
        }
    }

    Opsi

    Deskripsi

    Contoh

    config.endpoint

    Titik akhir dari instance KMS Anda.

    kms.cn-hangzhou.aliyuncs.com

    Teks Biasa

    Kata sandi teks biasa untuk dienkripsi.

    flink_rds_password@123

    KeyId

    ID kunci KMS.

    key-hzz67ab1ff4e750h****

  5. Tambahkan dependensi berikut ke file POM.xml.

        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>kms20160120</artifactId>
          <version>1.2.3</version>
        </dependency>
    
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>tea</artifactId>
          <version>1.3.2</version>
        </dependency>
  6. Jalankan file kelas EncryptFlink untuk mendapatkan ciphertext.

Langkah 3: Tambahkan kode dekripsi KMS ke program Anda

Prosedur

  1. Buat File Kelas Utilitas Dekripsi.

    1. Di IntelliJ IDEA, buat file kelas bernama KmsUtil di folder proyek target.

    2. Salin dan tempel potongan kode berikut ke file kelas KmsUtil. Ingatlah untuk memodifikasi nilai opsi konfigurasi agar sesuai dengan pengaturan spesifik Anda.

      package org.example;
      
      import com.aliyun.kms20160120.Client;
      import com.aliyun.kms20160120.models.DecryptRequest;
      import com.aliyun.teaopenapi.models.Config;
      
      public class KmsUtil {
          public static String decrypt(String ak, String sk, String ciphertext) throws Exception {
              Client client = new Client(new Config()
                      .setAccessKeyId(ak)
                      .setAccessKeySecret(sk)
                      .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com")
                      .setCa("-----BEGIN CERTIFICATE-----\n" +
                              "MIIDuzCCAqOgAwIBAgIJA*****--\n"));
              return client.decryptWithOptions(
                      new DecryptRequest().setCiphertextBlob(ciphertext),
                      new com.aliyun.teautil.models.RuntimeOptions()
              ).getBody().getPlaintext();
          }
      }
      

      Opsi

      Deskripsi

      Contoh

      Titik Akhir

      Titik akhir VPC dari instance KMS Anda.

      kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com

      Ca

      Sertifikat CA.

      Unduh sertifikat CA instance KMS ke perangkat Anda di Konsol KMS. Untuk informasi lebih lanjut, lihat Buat kredensial akses.

      -----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n

  2. Modifikasi File JavaDemo.

    1. Tulis kode untuk mengambil pasangan AccessKey dan mendekripsi data terenkripsi.

      Ganti nilai encryptedPassword dengan teks sandi yang diperoleh pada Langkah 2.

      // Parsing parameter untuk mendapatkan pasangan AccessKey.
       final ParameterTool params = ParameterTool.fromArgs(args);
       String ak = params.get("akid");
       String sk = params.get("aksecret");
       
       // Mendekripsi kata sandi terenkripsi.
       String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
       String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
    2. Ganti kata sandi teks biasa dengan variabel baru yang ditambahkan.

      Sebagai contoh, .password("flink_rds_password@123") diubah menjadi .password(decryptedPassword).

  3. Modifikasi File pom.xml.

    1. Atur mainClass ke org.example.JavaDemo.

    2. Tambahkan dependensi KMS.

          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
          </dependency>
      
          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
          </dependency>
    3. (Opsional) Ubah nilai artifactId menjadi KmsJavaDemo.

      Ini akan membantu Anda membedakan kedua JAR tersebut.

  4. Buat JAR.

    File KmsJavaDemo-1.0-SNAPSHOT.jar akan muncul di bawah direktori target.

Demo kode lengkap

KmsJavaDemo

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;


public class JavaDemo {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        // Parsing parameter untuk mendapatkan pasangan AccessKey.
        final ParameterTool params = ParameterTool.fromArgs(args);
        String ak = params.get("akid");
        String sk = params.get("aksecret");
        
        // Mendekripsi kata sandi terenkripsi.
        String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
        String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);

        // Membangun deserializer.
        DataType dataType =
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("username", DataTypes.STRING()),
                        DataTypes.FIELD("age", DataTypes.INT()));

        LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
        RowDataDebeziumDeserializeSchema deserializer =
                RowDataDebeziumDeserializeSchema.newBuilder()
                        .setPhysicalRowType((RowType) dataType.getLogicalType())
                        .setResultTypeInfo(typeInfo)
                        .build();

        // Konfigurasikan sumber data (com.ververica.cdc.connectors.mysql.source.MySqlSource).
        MySqlSource<RowData> mySqlSource =
                MySqlSource.<RowData>builder()
                        .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                        .port(3306)
                        .databaseList("school") // Tentukan database.
                        .tableList("school.student") // Tentukan tabel.
                        .username("flink_rds_user")
                        .password(decryptedPassword)
                        // Inisialisasi data dalam struktur RowData.
                        .deserializer(deserializer)
                        .build();

        // Integrasikan sumber data eksternal ke dalam program Flink DataStream
        // Jangan gunakan strategi watermark.
        DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Sumber MySQL");

        // Tulis ke stdout.
        mySQLSource.print();

        // Jalankan program.
        env.execute("Uji CDC MySQL");
    }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aliyun</groupId>
    <artifactId>KmsJavaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Demo Flink MySQL CDC</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink-cdc.version>2.4.2</flink-cdc.version>
        <log4j.version>2.17.1</log4j.version>
    </properties>
    <dependencies>
        <! -- Dependensi inti Flink (Tetapkan ruang lingkup ke provided saat mengepak program) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Realtime Compute for Apache Flink's MySQL CDC connector -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- Komentari baris berikut untuk eksekusi lokal -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- Versi harus konsisten dengan versi Flink yang Anda tentukan sebelumnya -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- Versi harus konsisten dengan versi Flink yang Anda tentukan sebelumnya -->
            <scope>provided</scope>
        </dependency>


        <! -- Dependensi log -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <! -- Plugin kompiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version> <! -- Versi tambalan -->
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <! -- Plugin untuk membuat fat JAR -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <! -- Pertahankan dependensi logging -->
                                    <!-- <exclude>org.slf4j:*</exclude> -->
                                    <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/MANIFEST.MF</exclude> <! -- Tambahkan filter kunci -->
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.JavaDemo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- Tambahkan transformer yang diperlukan -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Langkah 4: Sebarkan JAR baru dan mulai penyebaran

  1. Unggah JAR baru.

    1. Masuk ke konsol manajemen Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan klik Console di kolom Actions.

    3. Di panel navigasi di sebelah kiri, klik Artifacts.

    4. Klik Upload Artifact dan pilih JAR untuk diunggah.

      KmsJavaDemo-1.0-SNAPSHOT.jar yang dibuat pada langkah 3 diunggah dalam contoh ini.

  2. Buat penyebaran JAR.

    1. Di panel navigasi di sebelah kiri, pilih O&M. Di pojok kiri atas halaman Deployments, pilih Create Deployment > JAR Deployment.

    2. Di kotak dialog Create Deployment, konfigurasikan parameter penyebaran:

      Parameter

      Deskripsi

      Contoh

      Deployment Mode

      Mode yang ingin Anda gunakan untuk menyebar JAR. Pilih Stream Mode.

      Stream Mode

      Deployment Name

      Masukkan nama penyebaran JAR.

      kmsjavademo

      Engine Version

      Versi engine yang akan digunakan oleh penyebaran.

      Kami merekomendasikan Anda menggunakan versi engine dengan label RECOMMENDED atau STABLE karena lebih andal dan performanya lebih baik. Untuk informasi lebih lanjut, lihat Catatan rilis dan Versi engine.

      vvr-8.0.11-flink-1.17

      JAR URI

      Pilih JAR yang telah diunggah.

      Catatan

      Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru, akses ke bucket Object Storage Service (OSS) dibatasi hanya pada bucket yang terikat ke ruang kerja saat pembuatannya.

      KmsJavaDemo-1.0-SNAPSHOT.jar

      Entry Point Class

      Kelas titik masuk aplikasi JAR. Jika Anda tidak menentukan kelas utama untuk JAR, masukkan direktori standar di bidang Entry Point Class.

      -

      Entry Point Main Arguments

      Masukkan argumen yang ingin Anda lewatkan ke metode utama.

      Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi pasangan AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel. Dalam contoh ini, akid dan aksecret adalah nama variabel.

      Untuk instruksi tentang cara mendapatkan pasangan AccessKey, lihat Bagaimana cara melihat pasangan AccessKey dari sebuah akun?.

      --akid ${secret_values.akid} --aksecret ${secret_values.aksecret}

      Deployment Target

      Destinasi di mana penyebaran dilakukan. Pilih queue atau session cluster yang diinginkan dari daftar drop-down. Untuk informasi lebih lanjut, lihat Kelola antrian dan bagian "Langkah 1: Buat session cluster" dari topik Debug penyebaran.

      Penting

      Jangan gunakan session cluster untuk produksi. Session cluster tidak mendukung metrik pemantauan, pemantauan dan peringatan, atau Autopilot. Untuk informasi lebih lanjut, lihat Debug draf.

      default-queue

      Untuk informasi lebih lanjut, lihat Buat penyebaran.

    3. Klik Deploy.

  3. Mulai penyebaran.

    Di halaman Deployments, temukan deployment kmsjavademo, dan klik Start di kolom Actions. Di panel Start Job, pilih Initial Mode, dan klik Start.

Langkah 5: Lihat hasilnya di log TaskManager

Setelah status penyebaran menjadi RUNNING, buka halaman detail penyebaran. Pilih tab Logs dan subtab Running Task Managers. Klik item di kolom Path, ID, beralih ke subtab Log List, dan klik log dengan akhiran .out. Cari lily dan periksa hasil pemrosesan data.

image

Referensi