Realtime Compute for Apache Flink dapat berintegrasi dengan Layanan Manajemen Kunci (KMS) untuk mengenkripsi dan mendekripsi data sensitif, seperti kata sandi basis data, yang dikonfigurasi untuk beban kerja Flink Anda, sehingga memastikan keamanan data. Topik ini menjelaskan cara mengenkripsi dan mendekripsi kata sandi basis data dengan KMS dalam penyebaran JAR yang membaca data dari instance ApsaraDB RDS untuk basis data MySQL.
Informasi latar belakang
KMS adalah platform all-in-one yang mendukung manajemen kredensial yang disederhanakan, andal, aman, dan sesuai. KMS menyediakan API operasi kriptografi yang memungkinkan Anda mengenkripsi dan mendekripsi data secara sederhana, menghilangkan kompleksitas kriptografi yang rumit. Selain itu, KMS menawarkan rotasi kunci otomatis, meningkatkan keamanan data dan mengurangi upaya manajemen kunci. Untuk informasi lebih lanjut, lihat Manfaat dalam dokumentasi KMS.
Dalam skenario komputasi waktu nyata, Flink sering kali perlu terhubung ke sumber data (Kafka, MySQL, dll.) untuk mengakses data sensitif. Praktik tradisional melakukan hardcoding data sensitif atau menyimpannya dalam file konfigurasi dapat menyebabkan tantangan keamanan yang signifikan. Dengan berintegrasi dengan KMS, Flink dapat mengambil informasi terenkripsi dan mendekripsinya sesuai permintaan untuk melindungi dari paparan kredensial teks biasa.
Arsitektur solusi tersebut adalah sebagai berikut:
Prasyarat
Lingkungan pengembangan lokal telah disiapkan.
Alat pengembangan termasuk IntelliJ IDEA telah diinstal dan dikonfigurasi dengan benar.
Maven 3.6.3 atau yang lebih baru telah diinstal.
Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Instance ApsaraDB RDS untuk MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instance ApsaraDB RDS untuk MySQL dan konfigurasikan basis data.
Instans KMS telah dibuat dan diaktifkan, serta kunci KMS default telah dibuat. Untuk informasi lebih lanjut, lihat Beli dan aktifkan instans KMS dan Kelola kunci.
CatatanInstance ApsaraDB RDS untuk MySQL dan instance KMS harus berada di virtual private cloud (VPC) yang sama dengan ruang kerja Realtime Compute for Apache Flink. Jika mereka tidak berada di VPC yang sama, Anda harus membuat koneksi jaringan di antara mereka. Untuk informasi lebih lanjut, lihat Bagaimana cara Realtime Compute for Apache Flink mengakses layanan lintas VPC? dan Bagaimana cara Realtime Compute for Apache Flink mengakses Internet?.
(Opsional) Langkah 1: Persiapan
Persiapkan sumber data ApsaraDB RDS untuk MySQL.
Jalankan penyebaran JAR dengan data tidak terenkripsi
Langkah 2: Enkripsi kata sandi teks biasa dengan KMS
KMS mengenkripsi kata sandi teks biasa flink_rds_password@123. Kata sandi terenkripsi adalah a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==.
Untuk mendapatkan kunci enkripsi KMS, gunakan salah satu metode berikut:
Metode 1: Panggil operasi Enkripsi KMS di portal OpenAPI
Pergi ke Portal OpenAPI Alibaba Cloud. Pilih wilayah target.
Tetapkan KeyId dan Plaintext.
Klik Initiate Call.
Lihat ciphertext.
Untuk informasi lebih lanjut, lihat Enkripsi.
Metode 2: Panggil operasi Enkripsi KMS dari IntelliJ IDEA
Aktifkan akses Internet ke kunci KMS. Untuk informasi lebih lanjut, lihat Akses kunci instance KMS melalui Internet.
Kunci KMS hanya dapat diakses melalui jaringan VPC secara default.
Konfigurasikan variabel lingkungan
ALIBABA_CLOUD_ACCESS_KEY_IDdanALIBABA_CLOUD_ACCESS_KEY_SECRET. Untuk instruksi tentang cara mendapatkan pasangan AccessKey, lihat Bagaimana Cara Melihat Pasangan AccessKey dari Sebuah Akun?.Di proyek IntelliJ IDEA target, buat file kelas bernama EncryptFlink.
Salin dan tempel potongan kode berikut ke file kelas EncryptFlink. Ingatlah untuk memodifikasi nilai opsi konfigurasi agar sesuai dengan pengaturan spesifik Anda.
package org.example; import com.aliyun.kms20160120.models.EncryptResponse; import com.aliyun.kms20160120.models.EncryptResponseBody; import com.aliyun.tea.*; public class EncryptFlink { /** * : * <p>Gunakan pasangan AccessKey Anda untuk menginisialisasi klien.</p> * @return Client * * @throws Exception */ public static com.aliyun.kms20160120.Client createClient() throws Exception { // Jika kode proyek bocor, pasangan AccessKey mungkin bocor, membahayakan keamanan semua sumber daya dalam akun Anda. Contoh kode berikut hanya disediakan sebagai referensi. com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config() // Diperlukan. Pastikan bahwa variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dikonfigurasi. .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) // Diperlukan. Pastikan bahwa variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET dikonfigurasi. .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // Tentukan titik akhir. Untuk informasi lebih lanjut, kunjungi https://api.aliyun.com/product/Kms. config.endpoint = "kms.cn-hangzhou.aliyuncs.com"; return new com.aliyun.kms20160120.Client(config); } public static void main(String[] args_) throws Exception { java.util.List<String> args = java.util.Arrays.asList(args_); com.aliyun.kms20160120.Client client = EncryptFlink.createClient(); com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest() .setPlaintext("flink_rds_password@123") .setKeyId("key-hzz67ab1ff4e750h****"); com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); try { // Tulis kode Anda sendiri untuk menampilkan respons operasi API jika diperlukan. EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime); EncryptResponseBody body = encryptResponse.getBody(); System.out.println(body.getCiphertextBlob()); } catch (TeaException error) { // Tangani pengecualian dengan hati-hati dalam skenario bisnis aktual dan jangan abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan hanya dicetak untuk tujuan demonstrasi. // Pesan kesalahan. System.out.println(error.getMessage()); // URL untuk pemecahan masalah. System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } catch (Exception _error) { TeaException error = new TeaException(_error.getMessage(), _error); // Tangani pengecualian dengan hati-hati dalam skenario bisnis aktual dan jangan abaikan pengecualian dalam proyek Anda. Dalam contoh ini, pesan kesalahan hanya dicetak untuk tujuan demonstrasi. // Pesan kesalahan. System.out.println(error.getMessage()); // URL untuk pemecahan masalah. System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } } }Opsi
Deskripsi
Contoh
config.endpoint
Titik akhir dari instance KMS Anda.
kms.cn-hangzhou.aliyuncs.com
Teks Biasa
Kata sandi teks biasa untuk dienkripsi.
flink_rds_password@123
KeyId
ID kunci KMS.
key-hzz67ab1ff4e750h****
Tambahkan dependensi berikut ke file POM.xml.
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>Jalankan file kelas EncryptFlink untuk mendapatkan ciphertext.
Langkah 3: Tambahkan kode dekripsi KMS ke program Anda
Prosedur
Buat File Kelas Utilitas Dekripsi.
Di IntelliJ IDEA, buat file kelas bernama KmsUtil di folder proyek target.
Salin dan tempel potongan kode berikut ke file kelas KmsUtil. Ingatlah untuk memodifikasi nilai opsi konfigurasi agar sesuai dengan pengaturan spesifik Anda.
package org.example; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.DecryptRequest; import com.aliyun.teaopenapi.models.Config; public class KmsUtil { public static String decrypt(String ak, String sk, String ciphertext) throws Exception { Client client = new Client(new Config() .setAccessKeyId(ak) .setAccessKeySecret(sk) .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com") .setCa("-----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n")); return client.decryptWithOptions( new DecryptRequest().setCiphertextBlob(ciphertext), new com.aliyun.teautil.models.RuntimeOptions() ).getBody().getPlaintext(); } }Opsi
Deskripsi
Contoh
Titik Akhir
Titik akhir VPC dari instance KMS Anda.
kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com
Ca
Sertifikat CA.
Unduh sertifikat CA instance KMS ke perangkat Anda di Konsol KMS. Untuk informasi lebih lanjut, lihat Buat kredensial akses.
-----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n
Modifikasi File JavaDemo.
Tulis kode untuk mengambil pasangan AccessKey dan mendekripsi data terenkripsi.
Ganti nilai encryptedPassword dengan teks sandi yang diperoleh pada Langkah 2.
// Parsing parameter untuk mendapatkan pasangan AccessKey. final ParameterTool params = ParameterTool.fromArgs(args); String ak = params.get("akid"); String sk = params.get("aksecret"); // Mendekripsi kata sandi terenkripsi. String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA=="; String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);Ganti kata sandi teks biasa dengan variabel baru yang ditambahkan.
Sebagai contoh,
.password("flink_rds_password@123")diubah menjadi.password(decryptedPassword).
Modifikasi File pom.xml.
Atur mainClass ke
org.example.JavaDemo.Tambahkan dependensi KMS.
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>(Opsional) Ubah nilai artifactId menjadi KmsJavaDemo.
Ini akan membantu Anda membedakan kedua JAR tersebut.
Buat JAR.
File KmsJavaDemo-1.0-SNAPSHOT.jar akan muncul di bawah direktori target.
Demo kode lengkap
KmsJavaDemo
package org.example;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;
public class JavaDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Parsing parameter untuk mendapatkan pasangan AccessKey.
final ParameterTool params = ParameterTool.fromArgs(args);
String ak = params.get("akid");
String sk = params.get("aksecret");
// Mendekripsi kata sandi terenkripsi.
String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
// Membangun deserializer.
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("username", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
RowDataDebeziumDeserializeSchema deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
// Konfigurasikan sumber data (com.ververica.cdc.connectors.mysql.source.MySqlSource).
MySqlSource<RowData> mySqlSource =
MySqlSource.<RowData>builder()
.hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
.port(3306)
.databaseList("school") // Tentukan database.
.tableList("school.student") // Tentukan tabel.
.username("flink_rds_user")
.password(decryptedPassword)
// Inisialisasi data dalam struktur RowData.
.deserializer(deserializer)
.build();
// Integrasikan sumber data eksternal ke dalam program Flink DataStream
// Jangan gunakan strategi watermark.
DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Sumber MySQL");
// Tulis ke stdout.
mySQLSource.print();
// Jalankan program.
env.execute("Uji CDC MySQL");
}
}
POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>KmsJavaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Demo Flink MySQL CDC</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.1</flink.version>
<flink-cdc.version>2.4.2</flink-cdc.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<dependencies>
<! -- Dependensi inti Flink (Tetapkan ruang lingkup ke provided saat mengepak program) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Realtime Compute for Apache Flink's MySQL CDC connector -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>1.17-vvr-8.0.4-1</version>
<! -- Komentari baris berikut untuk eksekusi lokal -->
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.17.1</version> <! -- Versi harus konsisten dengan versi Flink yang Anda tentukan sebelumnya -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.17.1</version> <! -- Versi harus konsisten dengan versi Flink yang Anda tentukan sebelumnya -->
<scope>provided</scope>
</dependency>
<! -- Dependensi log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>kms20160120</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<! -- Plugin kompiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version> <! -- Versi tambalan -->
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<! -- Plugin untuk membuat fat JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<! -- Pertahankan dependensi logging -->
<!-- <exclude>org.slf4j:*</exclude> -->
<!-- <exclude>org.apache.logging.log4j:*</exclude> -->
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/MANIFEST.MF</exclude> <! -- Tambahkan filter kunci -->
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.JavaDemo</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- Tambahkan transformer yang diperlukan -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Langkah 4: Sebarkan JAR baru dan mulai penyebaran
Unggah JAR baru.
Masuk ke konsol manajemen Realtime Compute for Apache Flink.
Temukan ruang kerja target dan klik Console di kolom Actions.
Di panel navigasi di sebelah kiri, klik Artifacts.
Klik Upload Artifact dan pilih JAR untuk diunggah.
KmsJavaDemo-1.0-SNAPSHOT.jar yang dibuat pada langkah 3 diunggah dalam contoh ini.
Buat penyebaran JAR.
Di panel navigasi di sebelah kiri, pilih . Di pojok kiri atas halaman Deployments, pilih Create Deployment > JAR Deployment.
Di kotak dialog Create Deployment, konfigurasikan parameter penyebaran:
Parameter
Deskripsi
Contoh
Deployment Mode
Mode yang ingin Anda gunakan untuk menyebar JAR. Pilih Stream Mode.
Stream Mode
Deployment Name
Masukkan nama penyebaran JAR.
kmsjavademo
Engine Version
Versi engine yang akan digunakan oleh penyebaran.
Kami merekomendasikan Anda menggunakan versi engine dengan label RECOMMENDED atau STABLE karena lebih andal dan performanya lebih baik. Untuk informasi lebih lanjut, lihat Catatan rilis dan Versi engine.
vvr-8.0.11-flink-1.17
JAR URI
Pilih JAR yang telah diunggah.
CatatanDi Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau lebih baru, akses ke bucket Object Storage Service (OSS) dibatasi hanya pada bucket yang terikat ke ruang kerja saat pembuatannya.
KmsJavaDemo-1.0-SNAPSHOT.jar
Entry Point Class
Kelas titik masuk aplikasi JAR. Jika Anda tidak menentukan kelas utama untuk JAR, masukkan direktori standar di bidang Entry Point Class.
-
Entry Point Main Arguments
Masukkan argumen yang ingin Anda lewatkan ke metode utama.
Untuk melindungi pasangan AccessKey Anda, kami sarankan Anda mengonfigurasi pasangan AccessKey menggunakan variabel. Untuk informasi lebih lanjut, lihat Kelola variabel. Dalam contoh ini, akid dan aksecret adalah nama variabel.
Untuk instruksi tentang cara mendapatkan pasangan AccessKey, lihat Bagaimana cara melihat pasangan AccessKey dari sebuah akun?.
--akid ${secret_values.akid} --aksecret ${secret_values.aksecret}
Deployment Target
Destinasi di mana penyebaran dilakukan. Pilih queue atau session cluster yang diinginkan dari daftar drop-down. Untuk informasi lebih lanjut, lihat Kelola antrian dan bagian "Langkah 1: Buat session cluster" dari topik Debug penyebaran.
PentingJangan gunakan session cluster untuk produksi. Session cluster tidak mendukung metrik pemantauan, pemantauan dan peringatan, atau Autopilot. Untuk informasi lebih lanjut, lihat Debug draf.
default-queue
Untuk informasi lebih lanjut, lihat Buat penyebaran.
Klik Deploy.
Mulai penyebaran.
Di halaman Deployments, temukan deployment kmsjavademo, dan klik Start di kolom Actions. Di panel Start Job, pilih Initial Mode, dan klik Start.
Langkah 5: Lihat hasilnya di log TaskManager
Setelah status penyebaran menjadi RUNNING, buka halaman detail penyebaran. Pilih tab Logs dan subtab Running Task Managers. Klik item di kolom Path, ID, beralih ke subtab Log List, dan klik log dengan akhiran .out. Cari lily dan periksa hasil pemrosesan data.

