Realtime Compute for Apache Flink は Key Management Service (KMS) と統合して、Flink ワークロード用に構成された機密データ (データベースパスワードなど) を暗号化および復号し、データセキュリティを確保できます。このトピックでは、ApsaraDB RDS for MySQL データベースからデータを読み取る JAR デプロイメントで、KMS を使用してデータベースパスワードを暗号化および復号する方法について説明します。
背景情報
KMS は、簡素化され、信頼性が高く、安全で、コンプライアンスに準拠した認証情報管理とデータ暗号化をサポートするオールインワンプラットフォームです。 KMS は、簡素化された方法でデータを暗号化および復号できる暗号化 API 操作を提供し、複雑で抽象的な暗号化から解放されます。さらに、KMS は自動キーローテーションを提供し、データセキュリティを強化し、キー管理の手間を軽減します。詳細については、KMS ドキュメントの「メリット」をご参照ください。
リアルタイムコンピューティングのシナリオでは、Flink は多くの場合、データソース (Kafka、MySQL など) に接続して機密データにアクセスする必要があります。機密データをハードコーディングしたり、構成ファイルに保存したりする従来の方法は、重大なセキュリティ上の課題につながる可能性があります。 KMS と統合することにより、Flink は暗号化された情報を取得し、オンデマンドで復号して、プレーンテキストの認証情報の漏洩を防ぐことができます。
ソリューションのアーキテクチャは次のとおりです。
前提条件
オンプレミスの開発環境がセットアップされていること。
IntelliJ IDEA などの開発ツールがインストールされ、適切に構成されていること。
Maven 3.6.3 以降がインストールされていること。
Realtime Compute for Apache Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「手順 1: ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する」をご参照ください。
KMS インスタンスが作成されて有効になっており、デフォルトの KMS キーが作成されていること。詳細については、「KMS インスタンスを購入して有効にする」および「キーを管理する」をご参照ください。
説明ApsaraDB RDS for MySQL インスタンスと KMS インスタンスは、Realtime Compute for Apache Flink ワークスペースと同じ VPC (Virtual Private Cloud) に存在する必要があります。同じ VPC に存在しない場合は、それらの間にネットワーク接続を確立する必要があります。詳細については、「Realtime Compute for Apache Flink は VPC 間でサービスにどのようにアクセスしますか?」および「Realtime Compute for Apache Flink はインターネットにどのようにアクセスしますか?」をご参照ください。
(オプション) 手順 1: 準備を行う
ApsaraDB RDS for MySQL データソースを準備する。
暗号化されていないデータで JAR デプロイメントを実行する
手順 2: KMS でプレーンテキストパスワードを暗号化する
KMS はプレーンテキストパスワード flink_rds_password@123 を暗号化します。暗号化されたパスワードは a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA== です。
KMS 暗号鍵を取得するには、次のいずれかの方法を使用します。
方法 1: OpenAPI ポータルで KMS Encrypt 操作を呼び出す
Alibaba Cloud OpenAPI ポータル にアクセスします。ターゲットリージョンを選択します。
KeyId と Plaintext を設定します。
[呼び出しを開始] をクリックします。
暗号文を表示します。
詳細については、「暗号化する」をご参照ください。
方法 2: IntelliJ IDEA から KMS Encrypt 操作を呼び出す
KMS キーへのインターネットアクセスを有効にします。詳細については、「インターネット経由で KMS インスタンスキーにアクセスする。」をご参照ください。
KMS キーは、デフォルトでは VPC ネットワークを介してのみアクセスできます。
ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数を構成します。AccessKey ペアの取得方法については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。ターゲットの IntelliJ IDEA プロジェクトで、EncryptFlink という名前のクラスファイルを作成します。
次のコードスニペットを EncryptFlink クラスファイルにコピーして貼り付けます。構成オプションの値を特定のセットアップに合わせて変更することを忘れないでください。
package org.example; import com.aliyun.kms20160120.models.EncryptResponse; import com.aliyun.kms20160120.models.EncryptResponseBody; import com.aliyun.tea.*; public class EncryptFlink { /** * <b>説明</b> : * <p>AccessKey ペアを使用してクライアントを初期化します。</p> * @return Client * * @throws Exception */ public static com.aliyun.kms20160120.Client createClient() throws Exception { // プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウント内のすべてのリソースのセキュリティが損なわれる可能性があります。次のサンプルコードは参考用としてのみ提供されています。 com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config() // 必須。 ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が構成されていることを確認してください。 .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) // 必須。 ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が構成されていることを確認してください。 .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // エンドポイントを指定します。詳細については、https://api.aliyun.com/product/Kms をご覧ください。 config.endpoint = "kms.cn-hangzhou.aliyuncs.com"; return new com.aliyun.kms20160120.Client(config); } public static void main(String[] args_) throws Exception { java.util.List<String> args = java.util.Arrays.asList(args_); com.aliyun.kms20160120.Client client = EncryptFlink.createClient(); com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest() .setPlaintext("flink_rds_password@123") .setKeyId("key-hzz67ab1ff4e750h****"); com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions(); try { // 必要に応じて、API 操作のレスポンスを表示するための独自のコードを記述してください。 EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime); EncryptResponseBody body = encryptResponse.getBody(); System.out.println(body.getCiphertextBlob()); } catch (TeaException error) { // 実際のビジネスシナリオでは、例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、デモンストレーションのためにエラーメッセージが単に印刷されます。 // エラーメッセージ。 System.out.println(error.getMessage()); // トラブルシューティング用の URL。 System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } catch (Exception _error) { TeaException error = new TeaException(_error.getMessage(), _error); // 実際のビジネスシナリオでは、例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、デモンストレーションのためにエラーメッセージが単に印刷されます。 // エラーメッセージ。 System.out.println(error.getMessage()); // トラブルシューティング用の URL。 System.out.println(error.getData().get("Recommend")); com.aliyun.teautil.Common.assertAsString(error.message); } } }オプション
説明
例
config.endpoint
KMS インスタンスのエンドポイント。
kms.cn-hangzhou.aliyuncs.com
Plaintext
暗号化するプレーンテキストパスワード。
flink_rds_password@123
KeyId
KMS キー ID。
key-hzz67ab1ff4e750h****
POM.xml ファイルに次の依存関係を追加します。
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>EncryptFlink クラスファイルを実行して、暗号文を取得します。
手順 3: プログラムに KMS 復号コードを追加する
手順
復号ユーティリティクラスファイルを作成します。
IntelliJ IDEA で、ターゲットプロジェクトフォルダーの下に KmsUtil という名前のクラスファイルを作成します。
次のコードスニペットを KmsUtil クラスファイルにコピーして貼り付けます。構成オプションの値を特定のセットアップに合わせて変更することを忘れないでください。
package org.example; import com.aliyun.kms20160120.Client; import com.aliyun.kms20160120.models.DecryptRequest; import com.aliyun.teaopenapi.models.Config; public class KmsUtil { public static String decrypt(String ak, String sk, String ciphertext) throws Exception { Client client = new Client(new Config() .setAccessKeyId(ak) .setAccessKeySecret(sk) .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com") .setCa("-----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n")); return client.decryptWithOptions( new DecryptRequest().setCiphertextBlob(ciphertext), new com.aliyun.teautil.models.RuntimeOptions() ).getBody().getPlaintext(); } }オプション
説明
例
Endpoint
KMS インスタンスの VPC エンドポイント。
kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com
Ca
CA 証明書。
KMS コンソールで、KMS インスタンスの CA 証明書をデバイスにダウンロードします。詳細については、「アクセス認証情報を作成する」をご参照ください。
-----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n
JavaDemo ファイルを変更します。
AccessKey ペアを取得し、暗号化されたデータを復号するコードを記述します。
encryptedPassword の値を、手順 2 で取得した暗号文に置き換えます。
// パラメーターを解析して AccessKey ペアを取得します。 final ParameterTool params = ParameterTool.fromArgs(args); String ak = params.get("akid"); String sk = params.get("aksecret"); // 暗号化されたパスワードを復号します。 String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA=="; String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);プレーンテキストパスワードを新しく追加された変数に変更します。
たとえば、
.password("flink_rds_password@123")は.password(decryptedPassword)に変更されます。
pom.xml ファイルを変更します
mainClass を
org.example.JavaDemoに設定します。KMS 依存関係を追加します。
<dependency> <groupId>com.aliyun</groupId> <artifactId>kms20160120</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea</artifactId> <version>1.3.2</version> </dependency>(オプション) artifactId の値を KmsJavaDemo に変更します。
これは、2 つの JAR を区別するのに役立ちます。
JAR をビルドします。
KmsJavaDemo-1.0-SNAPSHOT.jar ファイルが target ディレクトリに表示されます。
完全なコードデモ
KmsJavaDemo
package org.example;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;
public class JavaDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// パラメーターを解析して AccessKey ペアを取得します。
final ParameterTool params = ParameterTool.fromArgs(args);
String ak = params.get("akid");
String sk = params.get("aksecret");
// 暗号化されたパスワードを復号します。
String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
// デシリアライザーを構築します。
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("username", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()));
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
RowDataDebeziumDeserializeSchema deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
// データソース (com.ververica.cdc.connectors.mysql.source.MySqlSource) を構成します。
MySqlSource<RowData> mySqlSource =
MySqlSource.<RowData>builder()
.hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
.port(3306)
.databaseList("school") // データベースを指定します。
.tableList("school.student") // テーブルを指定します。
.username("flink_rds_user")
.password(decryptedPassword)
// RowData 構造のデータを初期化します。
.deserializer(deserializer)
.build();
// 外部データソースを Flink DataStream プログラムに統合します
// ウォーターマーク戦略を使用しないでください。
DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
// 標準出力に書き込みます。
mySQLSource.print();
// プログラムを実行します。
env.execute("MySQL CDC Test");
}
}
POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>KmsJavaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Flink MySQL CDC Demo</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.1</flink.version>
<flink-cdc.version>2.4.2</flink-cdc.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<dependencies>
<! -- Flink コア依存関係 (プログラムをパッケージングする際、スコープを provided に設定します) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Apache Flink 向けリアルタイムコンピューティングの MySQL CDC コネクタ -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>1.17-vvr-8.0.4-1</version>
<! -- ローカルで実行する場合は、次の行をコメントアウトします -->
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.17.1</version> <! -- バージョンは、先に指定した Flink のバージョンと一致させる必要があります -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.17.1</version> <! -- バージョンは、先に指定した Flink のバージョンと一致させる必要があります -->
<scope>provided</scope>
</dependency>
<! -- ログの依存関係 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>kms20160120</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<! -- コンパイラプラグイン -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version> <! -- パッチバージョン -->
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<! -- fat JAR をビルドするためのプラグイン -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<! -- ログの依存関係を保持 -->
<!-- <exclude>org.slf4j:*</exclude> -->
<!-- <exclude>org.apache.logging.log4j:*</exclude> -->
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/MANIFEST.MF</exclude> <! -- キーフィルターを追加 -->
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.example.JavaDemo</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- 必要なトランスフォーマーを追加 -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
手順 4: 新しい JAR をデプロイし、デプロイメントを開始する
新しい JAR をアップロードします。
ターゲットワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[アーティファクト] をクリックします。
[アーティファクトのアップロード] をクリックし、アップロードする JAR を選択します。
この例では、手順 3 でビルドされた KmsJavaDemo-1.0-SNAPSHOT.jar がアップロードされます。
JAR デプロイメントを作成します。
左側のナビゲーションウィンドウで、 を選択します。 [デプロイメント] ページの左上隅で、[デプロイメントの作成] > [JAR デプロイメント] を選択します。
[デプロイメントの作成] ダイアログボックスで、デプロイメントのパラメーターを構成します。
パラメーター
説明
例
デプロイメントモード
JAR デプロイメントのデプロイに使用するモード。 [ストリームモード] を選択します。
ストリームモード
デプロイメント名
JAR デプロイメントの名前を入力します。
kmsjavademo
エンジンバージョン
デプロイメントで使用されるエンジンバージョン。
[推奨] または [安定] のラベルが付いたエンジンバージョンを使用することをお勧めします。これらはより信頼性が高く、パフォーマンスが優れているためです。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.11-flink-1.17
JAR URI
アップロードされた JAR を選択します。
説明VVR 8.0.6 以降を使用する Realtime Compute for Apache Flink では、Object Storage Service (OSS) バケットへのアクセスは、作成時にワークスペースにバインドされたバケットに制限されます。
KmsJavaDemo-1.0-SNAPSHOT.jar
エントリポイントクラス
JAR アプリケーションのエントリポイントクラス。JAR のメインクラスを指定しない場合は、[エントリポイントクラス] フィールドに標準ディレクトリを入力します。
-
エントリポイントのメイン引数
main メソッドに渡す引数を入力します。
AccessKey ペアを保護するために、変数を使用して AccessKey ペアを構成することをお勧めします。詳細については、「変数を管理する」をご参照ください。この例では、akid と aksecret は変数名です。
AccessKey ペアの取得方法については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。
--akid ${secret_values.akid} --aksecret ${secret_values.aksecret}
デプロイメントターゲット
デプロイメントがデプロイされる宛先。ドロップダウンリストから目的の [キュー] または [セッションクラスタ] を選択します。詳細については、「キューを管理する」および「デプロイメントをデバッグする」トピックの「手順 1: セッションクラスタを作成する」セクションをご参照ください。
重要本番環境ではセッションクラスタを使用しないでください。セッションクラスタは、モニタリングメトリック、モニタリングとアラート、または Autopilot をサポートしていません。詳細については、「ドラフトをデバッグする」をご参照ください。
default-queue
詳細については、「デプロイメントを作成する」をご参照ください。
[デプロイ] をクリックします。
デプロイメントを開始します。
[デプロイメント] ページで、kmsjavademo デプロイメントを見つけ、[アクション] 列の [開始] をクリックします。 [ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。
手順 5: TaskManager ログで結果を表示する
デプロイメントの状態が [実行中] になったら、デプロイメントの詳細ページに移動します。[ログ] タブと [実行中のタスクマネージャー] サブタブを選択します。[パス, ID] 列の項目をクリックし、[ログリスト] サブタブに切り替えて、.out サフィックスが付いたログをクリックします。lily を検索して、データ処理の結果を確認します。

