すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:KafkaコネクタでKerberos認証を使用する

最終更新日:Jan 07, 2025

Kerberosは、通信のセキュリティを確保するためにID認証に使用されるコンピューターネットワーク認証プロトコルです。 Realtime Compute for Apache Flinkデプロイメントは、Realtime Compute for Apache Flinkの開発コンソールで有効なKafkaとKerberosの情報を構成した場合にのみ、Kerberos認証が有効になっているKafkaクラスターにアクセスできます。 このトピックでは、KafkaクライアントでKerberos認証を有効にする方法について説明します。

制限事項

ApsaraMQ for Kafkaは、Kerberos認証をサポートしていません。

手順 1:準備を行う

Kerberos認証を有効にする前に、次の操作を実行します。

  • Kerberos構成ファイルを取得します。

    • krb5.confは、Kerberos認証環境の構成ファイルです。 krb5.conf構成ファイルは、Kerberos認証を有効にするクラスターと、Kerberosクライアントとサーバー間の接続に使用されるパラメーターを指定するために使用されます。 構成項目の詳細については、MIT Kerberosドキュメントを参照してください。

    • keytabは、ユーザーのID認証資格情報を含む構成ファイルであり、Kerberosサーバーでの認証中にユーザーを識別するために使用されます。

  • ドメイン名解決を構成します。

    Kerberos認証は、ドメイン名解決に依存しています。 Realtime Compute for Apache Flinkの開発コンソールで、Key Distribution Center(KDC)サーバーとKafkaのドメイン名とIPアドレスを構成する必要があります。

    • KDCサーバー: KDCサーバーのドメイン名とIPアドレスは、krb5.conf構成ファイルで定義されています。

    • Kafkaブローカー: Kafkaブローカーのドメイン名とIPアドレスは、server.properties構成ファイルで定義されています

      重要

      Kafkaクライアントは、Kerberosに登録されているKafkaブローカーのドメイン名を使用して、Kafkaブローカーに接続する必要があります。 そうしないと、KafkaクライアントはKerberos認証に失敗し、Kafkaブローカーに接続できません。 principalを確認して、Kafkaブローカーのドメイン名がKerberosに登録されているかどうかを確認できます。 詳細については、「Kerberosの基本操作」を参照してください。 Kafkaクラスター内のすべてのKafkaブローカーに対してドメイン名解決を構成する必要があります。

      たとえば、Kerberosのbroker1という名前のKafkaブローカーのプリンシパルがkafka/broker1.example.com@EXAMPLE.comの場合、broker1.example.comのドメイン名解決を構成する必要があります。

  • アクセスルールを作成します。

    Realtime Compute for Apache FlinkがKDCサーバーのポート 88 経由でKDCサーバーにアクセスできるように、アクセスルールを作成する必要があります。

手順 2:Kerberos構成ファイルをアップロードする

  1. Realtime Compute for Apache Flinkコンソールにログインします。

  2. 管理するワークスペースを見つけ、[コンソール] 列の [アクション] をクリックします。

  3. 左側のナビゲーションペインで、[アーティファクト]をクリックします。

  4. [アーティファクト] ページの左上隅にある [アーティファクトのアップロード] をクリックします。 アップロードする krb5.conf および keytab 構成ファイルを選択します。

    デプロイを構成するときに、アップロードするファイルの名前がkrb5.confkeytabである場合、パス/flink/usrlib/krb5.conf/flink/usrlib/keytabを使用してファイルを参照できます。

    説明

    Realtime Compute for Apache Flinkの開発コンソールで、Kafkaクライアントごとに異なるKerberosプリンシパルを指定できます。 デプロイに複数のKerberosプリンシパルが関係している場合は、すべてのプリンシパルのkeytabファイルをアップロードします。

  5. [開く] をクリックしてファイルをアップロードします。

手順 3:Kerberos認証を有効にする

方法 1:SQLデプロイメントでKerberos認証を有効にする

  1. Kafkaテーブルを作成します。

    1. SQLドラフトを作成するか、既存のSQLドラフトを開きます。 詳細については、「SQLドラフトを作成する」を参照してください。

    2. KafkaテーブルのWITH句に次の構成を追加して、KafkaクライアントのKerberos認証を有効にします。

      CREATE TEMPORARY TABLE kafka_kerberos (
           ...
      )WITH (
          'connector' = 'kafka',
          // Kerberosに登録されているKafkaブローカーのドメイン名を指定します。
          'properties.bootstrap.servers' = 'broker1.example.com:9092',
          // この例ではSASL_PLAINTEXTを使用します。
          'properties.security.protocol' = 'SASL_PLAINTEXT',
          'properties.sasl.mechanism' = 'GSSAPI',
          // ビジネス要件に基づいて、構成のプリンシパルのユーザー名とレルムを変更します。
          'properties.sasl.jaas.config' = 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/flink/usrlib/keytab" principal="user@EXAMPLE.COM";',
          // Kerberosに登録されているプリンシパルのサービス名を指定します。
          'properties.sasl.kerberos.service.name' = 'kafka',
          // その他の構成を追加します。
          ...
      )

      パラメーターの詳細については、CONFIGURATIONを参照してください。

    3. ドラフトを[デプロイ]します。 詳細については、「デプロイを作成する」を参照してください。

  2. デプロイメントのランタイムパラメーターを構成します。

    1. [O&M] > [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。

    2. [構成] タブの [パラメーター] セクションで、[その他の構成] フィールドに次の構成を追加して、Java仮想マシン(JVM)のKerberos構成ファイルを指定します。

      env.java.opts: '-Djava.security.krb5.conf=/flink/usrlib/krb5.conf'
    3. [パラメーター] セクションの右上隅にある [保存] をクリックします。

  3. [デプロイメント] ページの右上隅にある [開始] をクリックします。

方法 2:DataStreamデプロイメントでKerberos認証を有効にする

  1. KafkaソースとKafkaシンクを作成します。

    1. DataStreamドラフトを作成します。 詳細については、「JARドラフトを作成する」を参照してください。

    2. コードに次の構成を追加して、KafkaクライアントのKerberos認証を有効にします。

      KafkaSource.builder()
          ...
          // Kerberosに登録されているKafkaブローカーのドメイン名を指定します。
          .setBootstrapServers("broker1.example.com:9092")
          // この例ではSASL_PLAINTEXTを使用します。
          .setProperty("security.protocol", "SASL_PLAINTEXT")
          .setProperty("sasl.mechanism", "GSSAPI")
          // Kerberosに登録されている情報に基づいて、構成のプリンシパルのユーザー名とレルムを変更します。
          .setProperty("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/flink/usrlib/keytab\" principal=\"user@EXAMPLE.COM\";")
           // Kerberosに登録されているプリンシパルのサービス名を指定します。
          .setProperty("sasl.kerberos.service.name", "kafka")
      
      KafkaSink.builder()
          ...
          // Kerberosに登録されているKafkaブローカーのドメイン名を指定します。
          .setBootstrapServers("broker1.example.com:9092")
          // この例ではSASL_PLAINTEXTを使用します。
          .setProperty("security.protocol", "SASL_PLAINTEXT")
          .setProperty("sasl.mechanism", "GSSAPI")
          // Kerberosに登録されている情報に基づいて、構成のプリンシパルのユーザー名とレルムを変更します。
          .setProperty("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/flink/usrlib/keytab\" principal=\"user@EXAMPLE.COM\";")
          // Kerberosに登録されているプリンシパルのサービス名を指定します。
          .setProperty("sasl.kerberos.service.name", "kafka")

      パラメーターの詳細については、CONFIGURATIONを参照してください。

  2. デプロイメントのランタイムパラメーターを構成します。

    1. DataStreamドラフトを開発した後、プログラムをパッケージ化します。

    2. 生成されたJARファイルをRealtime Compute for Apache Flinkの開発コンソールにアップロードしてデプロイします。 詳細については、「JARデプロイを作成する」を参照してください。

    3. [デプロイメント] ページの [構成] タブの [パラメーター] セクションで、[その他の構成] フィールドに次の構成を追加して、JVMのKerberos構成ファイルを指定します。

      env.java.opts: '-Djava.security.krb5.conf=/flink/usrlib/krb5.conf'
    4. [パラメーター] セクションの右上隅にある [保存] をクリックします。

  3. [デプロイメント] ページの右上隅にある [開始] をクリックします。