すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Key Management Service を使用して Flink のデータベースパスワードを暗号化および復号化する

最終更新日:Oct 29, 2025

Realtime Compute for Apache Flink は Key Management Service (KMS) と統合して、Flink ワークロード用に構成された機密データ (データベースパスワードなど) を暗号化および復号し、データセキュリティを確保できます。このトピックでは、ApsaraDB RDS for MySQL データベースからデータを読み取る JAR デプロイメントで、KMS を使用してデータベースパスワードを暗号化および復号する方法について説明します。

背景情報

KMS は、簡素化され、信頼性が高く、安全で、コンプライアンスに準拠した認証情報管理とデータ暗号化をサポートするオールインワンプラットフォームです。 KMS は、簡素化された方法でデータを暗号化および復号できる暗号化 API 操作を提供し、複雑で抽象的な暗号化から解放されます。さらに、KMS は自動キーローテーションを提供し、データセキュリティを強化し、キー管理の手間を軽減します。詳細については、KMS ドキュメントの「メリット」をご参照ください。

リアルタイムコンピューティングのシナリオでは、Flink は多くの場合、データソース (Kafka、MySQL など) に接続して機密データにアクセスする必要があります。機密データをハードコーディングしたり、構成ファイルに保存したりする従来の方法は、重大なセキュリティ上の課題につながる可能性があります。 KMS と統合することにより、Flink は暗号化された情報を取得し、オンデマンドで復号して、プレーンテキストの認証情報の漏洩を防ぐことができます。

ソリューションのアーキテクチャは次のとおりです。

image

前提条件

(オプション) 手順 1: 準備を行う

ApsaraDB RDS for MySQL データソースを準備する。

  1. MySQL データベースとアカウントを作成します。

    データベース (school) と、データベースに対する読み取りおよび書き込み権限を持つ標準アカウント (flink_rds_user) を作成します。詳細については、「手順 1: ApsaraDB RDS for MySQL インスタンスを作成し、データベースを構成する」をご参照ください。

  2. ApsaraDB RDS for MySQL データソースを準備します。

    1. ターゲットインスタンスの詳細ページの右上隅にある [データベースにログオン] をクリックします。

    2. 表示されるダイアログボックスで、データベースアカウントとパスワードを入力します。次に、[ログイン] をクリックします。

    3. 正常にログオンしたら、左側のナビゲーションウィンドウで school データベースをダブルクリックします。

    4. SQL エディターで、DDL 文を書いて 3 つのビジネステーブルを作成し、データを入力します。

      CREATE TABLE `student` (
        id INT not null primary key,
        username VARCHAR(255),
        age BIGINT
      );
      
      INSERT INTO student VALUES
      (001, 'lily', 15),
      (002, 'leilei', 18),
      (003, 'xiaoming', 17),
      (004, 'huahua', 15);
      
      SELECT * FROM student;
  3. [実行(F8)] をクリックします。表示されるパネルで、[実行] をクリックします。

暗号化されていないデータで JAR デプロイメントを実行する

暗号化されていない情報を含む JAR デプロイメントが正しく実行できることを確認します。

  1. Flink プログラムをローカルで開発します。

    1. IntelliJ IDEA で新しいプロジェクトを作成します。

    2. 次のコードスニペットを JavaDemo クラスファイルと POM.xml ファイルにコピーして貼り付けます。構成オプションの値を特定のセットアップに合わせて変更することを忘れないでください。

      JavaDemo

      package org.example;
      
      import com.ververica.cdc.connectors.mysql.source.MySqlSource;
      import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
      import org.apache.flink.table.types.DataType;
      import org.apache.flink.table.types.logical.LogicalType;
      import org.apache.flink.table.types.logical.RowType;
      import org.apache.flink.table.types.utils.TypeConversions;
      
      
      public class JavaDemo {
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // デシリアライザーを構築します。
              DataType dataType =
                      DataTypes.ROW(
                              DataTypes.FIELD("id", DataTypes.INT()),
                              DataTypes.FIELD("username", DataTypes.STRING()),
                              DataTypes.FIELD("age", DataTypes.INT()));
      
              LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
              InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
              RowDataDebeziumDeserializeSchema deserializer =
                      RowDataDebeziumDeserializeSchema.newBuilder()
                              .setPhysicalRowType((RowType) dataType.getLogicalType())
                              .setResultTypeInfo(typeInfo)
                              .build();
      
              // データソース (com.ververica.cdc.connectors.mysql.source.MySqlSource) を構成します。
              MySqlSource<RowData> mySqlSource =
                      MySqlSource.<RowData>builder()
                              .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                              .port(3306)
                              .databaseList("school") // データベースを指定します。
                              .tableList("school.student") // テーブルを指定します。
                              .username("flink_rds_user")
                              .password("flink_rds_password@123")
                              // RowData 構造のデータを初期化します。
                              .deserializer(deserializer)
                              .build();
      
              // 外部データソースを Flink DataStream プログラムに統合します
              // ウォーターマーク戦略を使用しないでください。
              DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
      
              // 標準出力に書き込みます。
              mySQLSource.print();
      
              // プログラムを実行します。
              env.execute("MySQL CDC Test");
          }
      }
      

      POM.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
      
        <groupId>com.aliyun</groupId>
        <artifactId>JavaDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>Flink MySQL CDC Demo</name>
      
        <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <maven.compiler.source>1.8</maven.compiler.source>
          <maven.compiler.target>1.8</maven.compiler.target>
          <flink.version>1.17.1</flink.version>
          <flink-cdc.version>2.4.2</flink-cdc.version>
          <log4j.version>2.17.1</log4j.version>
        </properties>
      
        <dependencies>
          <! -- Flink コア依存関係 (プログラムをパッケージ化する際にスコープを provided に設定します) -->
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
          </dependency>
      
          <!-- Realtime Compute for Apache Flink の MySQL CDC コネクタ -->
          <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- ローカル実行の場合は次の行をコメントアウトします -->
            <!-- <scope>provided</scope> -->
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- バージョンは、以前に指定した Flink バージョンと一致している必要があります -->
            <scope>provided</scope>
          </dependency>
      
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- バージョンは、以前に指定した Flink バージョンと一致している必要があります -->
            <scope>provided</scope>
          </dependency>
      
      
          <! -- ログの依存関係 -->
          <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
          </dependency>
          
        </dependencies>
      
        <build>
          <plugins>
            <! -- コンパイラー プラグイン -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.13.0</version> <! -- パッチバージョン -->
              <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <encoding>UTF-8</encoding>
              </configuration>
            </plugin>
      
            <! -- fat JAR を構築するためのプラグイン -->
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-shade-plugin</artifactId>
              <version>3.5.1</version>
              <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                    <goal>shade</goal>
                  </goals>
                  <configuration>
                    <artifactSet>
                      <excludes>
                        <exclude>org.apache.flink:force-shading</exclude>
                        <exclude>com.google.code.findbugs:jsr305</exclude>
                        <! -- ログの依存関係を保持します -->
                        <!-- <exclude>org.slf4j:*</exclude> -->
                        <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                      </excludes>
                    </artifactSet>
                    <filters>
                      <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                          <exclude>META-INF/MANIFEST.MF</exclude> <! -- キーフィルターを追加します -->
                        </excludes>
                      </filter>
                    </filters>
                    <transformers>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.example.JavaDemo</mainClass>
                      </transformer>
                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- 必要なトランスフォーマーを追加します -->
                    </transformers>
                  </configuration>
      
                </execution>
              </executions>
            </plugin>
          </plugins>
        </build>
      </project>
      
    3. ApsaraDB RDS for MySQL インスタンスの JavaDemo で接続構成を変更します。

      オプション

      説明

      hostname

      MySQL のエンドポイント。

      rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com

      port

      MySQL インスタンスのポート。

      3306

      databaseList

      MySQL データベース名。

      school

      tableList

      MySQL テーブル名。

      school.student

      username

      MySQL データベースにアクセスするためのアカウント。

      flink_rds_user

      password

      MySQL データベースにアクセスするためのパスワード。

      flink_rds_password@123

    4. JAR をビルドします。

      JavaDemo-1.0-SNAPSHOT.jar ファイルがプロジェクトの target ディレクトリに表示されます。

  2. JAR をデプロイし、デプロイメントを開始し、コンソールでデータ処理結果を確認します。

    1. 開発コンソールで、[アーティファクト] に移動し、JavaDemo-1.0-SNAPSHOT.jar ファイルをアップロードします。

    2. JAR デプロイメントを作成します。

      1. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。 [デプロイメント] ページの左上隅で、[デプロイメントの作成] > [JAR デプロイメント] を選択します。

      2. [デプロイメントの作成] ダイアログボックスで、デプロイメントのパラメーターを構成します。

        パラメーター

        説明

        デプロイメントモード

        JAR デプロイメントのデプロイに使用するモード。 [ストリームモード] を選択します。

        ストリームモード

        デプロイメント名

        JAR デプロイメントの名前を入力します。

        javademo

        エンジンバージョン

        デプロイメントで使用されるエンジンバージョン。

        [推奨] または [安定] のラベルが付いたエンジンバージョンを使用することをお勧めします。これらはより信頼性が高く、パフォーマンスが優れているためです。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

        vvr-8.0.11-flink-1.17

        JAR URI

        アップロードされた JAR を選択します。

        説明

        VVR 8.0.6 以降を使用する Realtime Compute for Apache Flink では、Object Storage Service (OSS) バケットへのアクセスは、作成時にワークスペースにバインドされたバケットに制限されます。

        JavaDemo-1.0-SNAPSHOT.jar

        エントリポイントクラス

        JAR アプリケーションのエントリポイントクラス。JAR のメインクラスを指定しない場合は、[エントリポイントクラス] フィールドに標準ディレクトリを入力します。

        -

        エントリポイントのメイン引数

        main メソッドに渡す引数を入力します。

        -

        デプロイメントターゲット

        デプロイメントがデプロイされる宛先。ドロップダウンリストから目的の [キュー] または [セッションクラスタ] を選択します。詳細については、「キューを管理する」および「デプロイメントをデバッグする」トピックの「手順 1: セッションクラスタを作成する」セクションをご参照ください。

        重要

        本番環境ではセッションクラスタを使用しないでください。セッションクラスタは、モニタリングメトリック、モニタリングとアラート、または Autopilot をサポートしていません。詳細については、「ドラフトをデバッグする」をご参照ください。

        default-queue

        詳細については、「デプロイメントを作成する」をご参照ください。

      3. [デプロイ] をクリックします。

    3. [デプロイメント] ページで、javademo デプロイメントを探し、[アクション] 列の [開始] をクリックします。[ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

    4. データ処理結果を確認します。

      デプロイメントの状態が [実行中] になったら、デプロイメントの詳細ページに移動します。[ログ] タブと [実行中のタスクマネージャー] サブタブを選択します。[パス, ID] 列の項目をクリックし、[ログリスト] サブタブに切り替えて、.out サフィックスが付いたログをクリックします。lily を検索して、データ処理の結果を確認します。

      image

手順 2: KMS でプレーンテキストパスワードを暗号化する

KMS はプレーンテキストパスワード flink_rds_password@123 を暗号化します。暗号化されたパスワードは a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA== です。

KMS 暗号鍵を取得するには、次のいずれかの方法を使用します。

方法 1: OpenAPI ポータルで KMS Encrypt 操作を呼び出す

  1. Alibaba Cloud OpenAPI ポータル にアクセスします。ターゲットリージョンを選択します。

  2. KeyId と Plaintext を設定します。

  3. [呼び出しを開始] をクリックします。

  4. 暗号文を表示します。

詳細については、「暗号化する」をご参照ください。

方法 2: IntelliJ IDEA から KMS Encrypt 操作を呼び出す

  1. KMS キーへのインターネットアクセスを有効にします。詳細については、「インターネット経由で KMS インスタンスキーにアクセスする。」をご参照ください。

    KMS キーは、デフォルトでは VPC ネットワークを介してのみアクセスできます。

  2. ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数を構成します。AccessKey ペアの取得方法については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。

  3. ターゲットの IntelliJ IDEA プロジェクトで、EncryptFlink という名前のクラスファイルを作成します。

  4. 次のコードスニペットを EncryptFlink クラスファイルにコピーして貼り付けます。構成オプションの値を特定のセットアップに合わせて変更することを忘れないでください。

    package org.example;
    
    import com.aliyun.kms20160120.models.EncryptResponse;
    import com.aliyun.kms20160120.models.EncryptResponseBody;
    import com.aliyun.tea.*;
    
    public class EncryptFlink {
    
        /**
         * <b>説明</b> :
         * <p>AccessKey ペアを使用してクライアントを初期化します。</p>
         * @return Client
         *
         * @throws Exception
         */
        public static com.aliyun.kms20160120.Client createClient() throws Exception {
            // プロジェクトコードが漏洩した場合、AccessKey ペアが漏洩し、アカウント内のすべてのリソースのセキュリティが損なわれる可能性があります。次のサンプルコードは参考用としてのみ提供されています。
            com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
                    // 必須。 ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数が構成されていることを確認してください。
                    .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                    // 必須。 ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が構成されていることを確認してください。
                    .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // エンドポイントを指定します。詳細については、https://api.aliyun.com/product/Kms をご覧ください。
            config.endpoint = "kms.cn-hangzhou.aliyuncs.com";
            return new com.aliyun.kms20160120.Client(config);
        }
    
        public static void main(String[] args_) throws Exception {
            java.util.List<String> args = java.util.Arrays.asList(args_);
            com.aliyun.kms20160120.Client client = EncryptFlink.createClient();
            com.aliyun.kms20160120.models.EncryptRequest encryptRequest = new com.aliyun.kms20160120.models.EncryptRequest()
                    .setPlaintext("flink_rds_password@123")
                    .setKeyId("key-hzz67ab1ff4e750h****");
            com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
            try {
                // 必要に応じて、API 操作のレスポンスを表示するための独自のコードを記述してください。
                EncryptResponse encryptResponse = client.encryptWithOptions(encryptRequest, runtime);
                EncryptResponseBody body = encryptResponse.getBody();
                System.out.println(body.getCiphertextBlob());
            } catch (TeaException error) {
                // 実際のビジネスシナリオでは、例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、デモンストレーションのためにエラーメッセージが単に印刷されます。
                // エラーメッセージ。
                System.out.println(error.getMessage());
                // トラブルシューティング用の URL。
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            } catch (Exception _error) {
                TeaException error = new TeaException(_error.getMessage(), _error);
                // 実際のビジネスシナリオでは、例外を慎重に処理し、プロジェクトで例外を無視しないでください。この例では、デモンストレーションのためにエラーメッセージが単に印刷されます。
                // エラーメッセージ。
                System.out.println(error.getMessage());
                // トラブルシューティング用の URL。
                System.out.println(error.getData().get("Recommend"));
                com.aliyun.teautil.Common.assertAsString(error.message);
            }
        }
    }

    オプション

    説明

    config.endpoint

    KMS インスタンスのエンドポイント。

    kms.cn-hangzhou.aliyuncs.com

    Plaintext

    暗号化するプレーンテキストパスワード。

    flink_rds_password@123

    KeyId

    KMS キー ID。

    key-hzz67ab1ff4e750h****

  5. POM.xml ファイルに次の依存関係を追加します。

        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>kms20160120</artifactId>
          <version>1.2.3</version>
        </dependency>
    
        <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>tea</artifactId>
          <version>1.3.2</version>
        </dependency>
  6. EncryptFlink クラスファイルを実行して、暗号文を取得します。

手順 3: プログラムに KMS 復号コードを追加する

手順

  1. 復号ユーティリティクラスファイルを作成します。

    1. IntelliJ IDEA で、ターゲットプロジェクトフォルダーの下に KmsUtil という名前のクラスファイルを作成します。

    2. 次のコードスニペットを KmsUtil クラスファイルにコピーして貼り付けます。構成オプションの値を特定のセットアップに合わせて変更することを忘れないでください。

      package org.example;
      
      import com.aliyun.kms20160120.Client;
      import com.aliyun.kms20160120.models.DecryptRequest;
      import com.aliyun.teaopenapi.models.Config;
      
      public class KmsUtil {
          public static String decrypt(String ak, String sk, String ciphertext) throws Exception {
              Client client = new Client(new Config()
                      .setAccessKeyId(ak)
                      .setAccessKeySecret(sk)
                      .setEndpoint("kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com")
                      .setCa("-----BEGIN CERTIFICATE-----\n" +
                              "MIIDuzCCAqOgAwIBAgIJA*****--\n"));
              return client.decryptWithOptions(
                      new DecryptRequest().setCiphertextBlob(ciphertext),
                      new com.aliyun.teautil.models.RuntimeOptions()
              ).getBody().getPlaintext();
          }
      }
      

      オプション

      説明

      Endpoint

      KMS インスタンスの VPC エンドポイント。

      kst-hzz67ab1e****f7hle9ab.cryptoservice.kms.aliyuncs.com

      Ca

      CA 証明書。

      KMS コンソールで、KMS インスタンスの CA 証明書をデバイスにダウンロードします。詳細については、「アクセス認証情報を作成する」をご参照ください。

      -----BEGIN CERTIFICATE-----\n" + "MIIDuzCCAqOgAwIBAgIJA*****--\n

  2. JavaDemo ファイルを変更します。

    1. AccessKey ペアを取得し、暗号化されたデータを復号するコードを記述します。

      encryptedPassword の値を、手順 2 で取得した暗号文に置き換えます。

      // パラメーターを解析して AccessKey ペアを取得します。
       final ParameterTool params = ParameterTool.fromArgs(args);
       String ak = params.get("akid");
       String sk = params.get("aksecret");
       
       // 暗号化されたパスワードを復号します。
       String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
       String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);
    2. プレーンテキストパスワードを新しく追加された変数に変更します。

      たとえば、.password("flink_rds_password@123").password(decryptedPassword) に変更されます。

  3. pom.xml ファイルを変更します

    1. mainClass を org.example.JavaDemo に設定します。

    2. KMS 依存関係を追加します。

          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
          </dependency>
      
          <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
          </dependency>
    3. (オプション) artifactId の値を KmsJavaDemo に変更します。

      これは、2 つの JAR を区別するのに役立ちます。

  4. JAR をビルドします。

    KmsJavaDemo-1.0-SNAPSHOT.jar ファイルが target ディレクトリに表示されます。

完全なコードデモ

KmsJavaDemo

package org.example;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.api.java.utils.ParameterTool;


public class JavaDemo {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // パラメーターを解析して AccessKey ペアを取得します。
        final ParameterTool params = ParameterTool.fromArgs(args);
        String ak = params.get("akid");
        String sk = params.get("aksecret");

        // 暗号化されたパスワードを復号します。
        String encryptedPassword = "a2V5LWh6ejY3YTJmZTY5ejR2NTlpOHE1MC03d3ozYWU1anlzC6NLoC0JHHEXTJ4P/4iVOe/B+eniv8EcaviQDzZWWNPedOYkoFFYWA==";
        String decryptedPassword = KmsUtil.decrypt(ak, sk, encryptedPassword);

        // デシリアライザーを構築します。
        DataType dataType =
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.INT()),
                        DataTypes.FIELD("username", DataTypes.STRING()),
                        DataTypes.FIELD("age", DataTypes.INT()));

        LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
        RowDataDebeziumDeserializeSchema deserializer =
                RowDataDebeziumDeserializeSchema.newBuilder()
                        .setPhysicalRowType((RowType) dataType.getLogicalType())
                        .setResultTypeInfo(typeInfo)
                        .build();

        // データソース (com.ververica.cdc.connectors.mysql.source.MySqlSource) を構成します。
        MySqlSource<RowData> mySqlSource =
                MySqlSource.<RowData>builder()
                        .hostname("rm-bp****2ye09w72zjq.mysql.rds.aliyuncs.com")
                        .port(3306)
                        .databaseList("school") // データベースを指定します。
                        .tableList("school.student") // テーブルを指定します。
                        .username("flink_rds_user")
                        .password(decryptedPassword)
                        // RowData 構造のデータを初期化します。
                        .deserializer(deserializer)
                        .build();

        // 外部データソースを Flink DataStream プログラムに統合します
        // ウォーターマーク戦略を使用しないでください。
        DataStreamSource<RowData> mySQLSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");

        // 標準出力に書き込みます。
        mySQLSource.print();

        // プログラムを実行します。
        env.execute("MySQL CDC Test");
    }
}

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aliyun</groupId>
    <artifactId>KmsJavaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Flink MySQL CDC Demo</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink-cdc.version>2.4.2</flink-cdc.version>
        <log4j.version>2.17.1</log4j.version>
    </properties>
    <dependencies>
        <! -- Flink コア依存関係 (プログラムをパッケージングする際、スコープを provided に設定します) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Apache Flink 向けリアルタイムコンピューティングの MySQL CDC コネクタ -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
            <! -- ローカルで実行する場合は、次の行をコメントアウトします -->
            <!-- <scope>provided</scope> -->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.1</version> <! -- バージョンは、先に指定した Flink のバージョンと一致させる必要があります -->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.17.1</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.1</version> <! -- バージョンは、先に指定した Flink のバージョンと一致させる必要があります -->
            <scope>provided</scope>
        </dependency>


        <! -- ログの依存関係 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.1</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>kms20160120</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>tea</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <! -- コンパイラプラグイン -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version> <! -- パッチバージョン -->
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <! -- fat JAR をビルドするためのプラグイン -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <! -- ログの依存関係を保持 -->
                                    <!-- <exclude>org.slf4j:*</exclude> -->
                                    <!-- <exclude>org.apache.logging.log4j:*</exclude> -->
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/MANIFEST.MF</exclude> <! -- キーフィルターを追加 -->
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.JavaDemo</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <! -- 必要なトランスフォーマーを追加 -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

手順 4: 新しい JAR をデプロイし、デプロイメントを開始する

  1. 新しい JAR をアップロードします。

    1. Realtime Compute for Apache Flink の管理コンソール にログオンします。

    2. ターゲットワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[アーティファクト] をクリックします。

    4. [アーティファクトのアップロード] をクリックし、アップロードする JAR を選択します。

      この例では、手順 3 でビルドされた KmsJavaDemo-1.0-SNAPSHOT.jar がアップロードされます。

  2. JAR デプロイメントを作成します。

    1. 左側のナビゲーションウィンドウで、[O&M] を選択します。 [デプロイメント] ページの左上隅で、[デプロイメントの作成] > [JAR デプロイメント] を選択します。

    2. [デプロイメントの作成] ダイアログボックスで、デプロイメントのパラメーターを構成します。

      パラメーター

      説明

      デプロイメントモード

      JAR デプロイメントのデプロイに使用するモード。 [ストリームモード] を選択します。

      ストリームモード

      デプロイメント名

      JAR デプロイメントの名前を入力します。

      kmsjavademo

      エンジンバージョン

      デプロイメントで使用されるエンジンバージョン。

      [推奨] または [安定] のラベルが付いたエンジンバージョンを使用することをお勧めします。これらはより信頼性が高く、パフォーマンスが優れているためです。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

      vvr-8.0.11-flink-1.17

      JAR URI

      アップロードされた JAR を選択します。

      説明

      VVR 8.0.6 以降を使用する Realtime Compute for Apache Flink では、Object Storage Service (OSS) バケットへのアクセスは、作成時にワークスペースにバインドされたバケットに制限されます。

      KmsJavaDemo-1.0-SNAPSHOT.jar

      エントリポイントクラス

      JAR アプリケーションのエントリポイントクラス。JAR のメインクラスを指定しない場合は、[エントリポイントクラス] フィールドに標準ディレクトリを入力します。

      -

      エントリポイントのメイン引数

      main メソッドに渡す引数を入力します。

      AccessKey ペアを保護するために、変数を使用して AccessKey ペアを構成することをお勧めします。詳細については、「変数を管理する」をご参照ください。この例では、akid と aksecret は変数名です。

      AccessKey ペアの取得方法については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか?」をご参照ください。

      --akid ${secret_values.akid} --aksecret ${secret_values.aksecret}

      デプロイメントターゲット

      デプロイメントがデプロイされる宛先。ドロップダウンリストから目的の [キュー] または [セッションクラスタ] を選択します。詳細については、「キューを管理する」および「デプロイメントをデバッグする」トピックの「手順 1: セッションクラスタを作成する」セクションをご参照ください。

      重要

      本番環境ではセッションクラスタを使用しないでください。セッションクラスタは、モニタリングメトリック、モニタリングとアラート、または Autopilot をサポートしていません。詳細については、「ドラフトをデバッグする」をご参照ください。

      default-queue

      詳細については、「デプロイメントを作成する」をご参照ください。

    3. [デプロイ] をクリックします。

  3. デプロイメントを開始します。

    [デプロイメント] ページで、kmsjavademo デプロイメントを見つけ、[アクション] 列の [開始] をクリックします。 [ジョブの開始] パネルで、[初期モード] を選択し、[開始] をクリックします。

手順 5: TaskManager ログで結果を表示する

デプロイメントの状態が [実行中] になったら、デプロイメントの詳細ページに移動します。[ログ] タブと [実行中のタスクマネージャー] サブタブを選択します。[パス, ID] 列の項目をクリックし、[ログリスト] サブタブに切り替えて、.out サフィックスが付いたログをクリックします。lily を検索して、データ処理の結果を確認します。

image

参考資料