すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:LDAP ユーザー認証の使用

最終更新日:Mar 27, 2026

このトピックでは、E-MapReduce (EMR) Kafka における LDAP を使用した認証の設定方法を説明し、オープンソース Kafka 2.4.1 を用いたエンドツーエンドの完全な手順例を紹介します。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • Kafka および OpenLDAP サービスが選択された Dataflow クラスター。詳細については、「Dataflow Kafka クラスターの作成」をご参照ください。

  • EMR V3.44.0 より後のマイナーバージョン、または EMR V5.10.0 より後のマイナーバージョンで実行中の Kafka クラスター。

  • クラスター内にデプロイされた EMR OpenLDAP サービス(または外部 LDAP サービス)。本トピックでは EMR OpenLDAP サービスを使用します。

注意事項

ユーザーグループ認証を設定するには、LDAP サービスで memberOf オーバーレイ機能が有効化されているか、個々の LDAP ユーザーに対して memberOf 属性が設定可能である必要があります。

LDAP ユーザーによる認証の設定

ステップ 1:スーパーユーザーの作成

既に Kafka スーパーユーザーをお持ちの場合は、このステップはスキップしてください。

Kafka スーパーユーザーは、すべての Kafka リソースへのアクセス権限を持ちます。本設定では、ブローカーノードおよび Kafka コンポーネントへのアクセスにスーパーユーザーを使用します。このステップでは、EMR OpenLDAP サービスに Kafka スーパーユーザーを追加します。

  1. クラスターの master-1-1 ノードに SSH で接続します。詳細については、「クラスターにログオンする」をご参照ください。

  2. 以下の内容で kafka.ldif というファイルを作成します。これにより、UID が kafka、パスワードが kafka-secret の LDAP ユーザーが追加されます。

    dn: uid=kafka,ou=people,o=emr
    cn: kafka
    sn: kafka
    objectClass: inetOrgPerson
    userPassword: kafka-secret
    uid: kafka
  3. 以下のコマンドを実行して LDAP ユーザーを追加します。

    ldapadd -H ldap://master-1-1:10389 -f kafka.ldif -D ${uid} -w ${rootDnPW}

    プレースホルダーを以下のように置き換えます。

    プレースホルダー 説明 確認方法
    ${uid} admin_dn パラメーターの値 構成 タブ
    ${rootDnPW} admin_pwd パラメーターの値 [設定] タブ(EMR コンソール内の OpenLDAP サービスページ)

    10389 は OpenLDAP サービスのデフォルトリスニングポートです。

    ユーザーが正常に追加されたことを確認するには、以下のコマンドを実行します。

    ldapsearch -w ${rootDnPW} -D ${uid} -H ldap://master-1-1:10389 -b uid=kafka,ou=people,o=emr

ステップ 2:Kafka サービスの構成ページへ移動

  1. ECS 上の EMR コンソール にログインします。

  2. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、リソースグループを選択します。

  3. ECS 上の EMR ページで、対象のクラスターを見つけ、[操作] 列の サービス をクリックします。

  4. サービス タブで Kafka を見つけ、構成 をクリックします。

ステップ 3:既存の設定項目の更新

  1. 構成 タブで、server.properties タブをクリックします。

  2. kafka.ssl.config.typeCUSTOM に設定します。

  3. authorizer.class.namekafka.security.ldap.authorizer.SimpleLdapAuthorizer に設定します。

  4. 保存 をクリックします。ダイアログボックスで 実行理由 を入力し、構成を自動的に更新 を有効化します。

ステップ 4:新しい設定項目の追加

  1. server.properties タブで、設定項目の追加 をクリックします。

  2. 設定項目の追加 ダイアログボックスで、以下の項目を追加します。

    設定項目 説明
    super.users User:kafka ステップ 1 で作成したスーパーユーザー名。ここにある kafka を実際のスーパーユーザー名に置き換えてください。
    listener.name.${listener}.sasl.enabled.mechanisms PLAIN ${listener} を実際のリスナー名(例: sasl_ssl)に置き換えてください。
    listener.name.${listener}.plain.sasl.jaas.config 下記を参照 ${listener} を実際のリスナー名に置き換えてください。LDAP 関連の値は、ご自身の構成に基づいて更新してください。
    listener.name.${listener}.plain.sasl.server.callback.handler.class kafka.security.ldap.authenticator.LdapAuthenticateCallbackHandler ${listener} を実際のリスナー名に置き換えてください。
    sasl.mechanism.inter.broker.protocol PLAIN
    sasl.enabled.mechanisms PLAIN

    listener.name.${listener}.plain.sasl.jaas.config の値として、以下の内容を使用し、各オプションを LDAP 構成に基づいて適宜置き換えてください。

    org.apache.kafka.common.security.plain.PlainLoginModule required
      username="kafka"
      password="kafka-secret"
      emr.kafka.security.ldap.user.name.attribute="uid"
      emr.kafka.security.ldap.user.base.dn="ou=people,o=emr"
      emr.kafka.security.ldap.group.name.attribute="cn"
      emr.kafka.security.ldap.group.base.dn="ou=groups,o=emr"
      emr.kafka.security.ldap.admin.name.attribute="uid"
      emr.kafka.security.ldap.admin.base.dn="o=emr"
      emr.kafka.security.ldap.url="ldaps://master-1-1:10636"
      emr.kafka.security.ldap.bind.user="admin"
      emr.kafka.security.ldap.bind.user.password="WMMuhh3P**********"
      emr.kafka.security.ldap.user.member.of.attribute="memberOf"
      emr.kafka.security.ldap.group.authorization.support="true" ;

    以下の表では、LDAP 関連のオプションについて説明します。

    オプション 説明
    username スーパーユーザー名。Kafka はこのアカウントを使用してブローカーノードにアクセスします。
    emr.kafka.security.ldap.user.name.attribute ユーザー名を識別するための LDAP 属性(ユーザー名の検索に使用)。
    emr.kafka.security.ldap.user.base.dn LDAP ユーザーのベース識別名(DN)。
    emr.kafka.security.ldap.group.name.attribute グループ名を識別するための LDAP 属性。
    emr.kafka.security.ldap.group.base.dn LDAP ユーザーグループのベース DN。
    emr.kafka.security.ldap.admin.name.attribute 管理者ユーザー名を識別するための LDAP 属性。
    emr.kafka.security.ldap.admin.base.dn LDAP 管理者ユーザーのベース DN。
    emr.kafka.security.ldap.url OpenLDAP サービスの URL。
    emr.kafka.security.ldap.bind.user ユーザーグループ認証に使用される LDAP 管理者ユーザー名。
    emr.kafka.security.ldap.bind.user.password LDAP 管理者のパスワード。
    emr.kafka.security.ldap.user.member.of.attribute ユーザーが所属するグループを記録する LDAP 属性。
    emr.kafka.security.ldap.group.authorization.support true を設定すると、グループベースの権限付与が有効化されます。有効化すると、ユーザーは所属するグループから権限を継承します。
  3. 要件に応じて、以下の構成ファイルを更新します。

    kafka.client.jaas.content 内のユーザー名およびパスワードを、実際の認証情報に置き換えてください。ブローカーレベルのアクセスにはスーパーユーザーの認証情報を使用します。
    構成ファイル パラメーター 備考
    kafka_client_jaas.conf kafka.client.jaas.content KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; }; 値はセミコロン (;) で終了する必要があります。
    schema-registry.properties schema_registry_opts -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf 既に値が存在する場合は、末尾に追加してください。
    kafka-rest.properties kafkarest_opts -Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf 既に値が存在する場合は、末尾に追加してください。

ステップ 5:Kafka サービスの再起動

  1. 構成 タブで、その他再起動 を選択します。

  2. ダイアログボックスで 実行理由 を入力し、OK をクリックします。

  3. 確認 ダイアログで、OK をクリックします。

本番環境に適用する際のセキュリティ上の考慮事項

次のセクションの手順例では、認証情報をプレーンテキストで送信する SASL_PLAINTEXT を使用しています。本番環境では、以下の点に注意してください。

  • 通信中の認証情報の暗号化を行うため、SASL_SSL を使用し、SASL_PLAINTEXT を避けてください。

  • 可能な限り、平文の構成ファイルにパスワードを格納しないでください。sasl.jaas.config 内の LDAP bind パスワードは、適切な保護が必要です。

次の例では、オープンソースの Kafka 2.4.1 を使用しています。他の Kafka バージョンについては、「Apache Kafka セキュリティドキュメント」をご参照ください。

この手順例では、LDAP ユーザーグループおよび ACL を使用したグループベースの権限付与を実証します。

ステップ 1:テスト用ユーザーおよびユーザーグループの作成

1.1 memberOf オーバーレイの有効化(未有効の場合)

OpenLDAP サービスで memberOf オーバーレイ機能が無効になっている場合、ユーザーグループの作成前に有効化する必要があります。

memberOf オーバーレイの有効化手順は、LDAP サービスによって異なります。以下の手順は EMR OpenLDAP サービスに適用されます。
OpenLDAP サービスがデプロイされている すべてのノード で、この機能を有効化してください。
以下のパラメーターは、ご自身の環境に応じて調整してください。
olcModulepath:32 ビット OS の場合、/usr/lib/openldap に設定します。
dn: cn=module{0},cn=configcn=module{0}.ldif/etc/openldap/slapd.d/cn=config にすでに存在する場合、01 に変更します。同様に、cn=module{1}.ldif が存在する場合は、12 に変更します。
dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=configolcDatabase/etc/openldap/slapd.d/cn=config の値に基づいて設定します。たとえば、ファイル名が {2}hdb.ldif の場合、olcDatabasehdb に設定します。
  1. 以下の内容で memberof_config.ldif を作成します。

    dn: cn=module{0},cn=config
    cn: module{0}
    objectClass: olcModuleList
    objectclass: top
    olcModuleload: memberof.la
    olcModulePath: /usr/lib64/openldap
    
    dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config
    objectClass: olcConfig
    objectClass: olcMemberOf
    objectClass: olcOverlayConfig
    objectClass: top
    olcOverlay: memberof
    olcMemberOfDangling: ignore
    olcMemberOfRefInt: TRUE
    olcMemberOfGroupOC: groupOfNames
    olcMemberOfMemberAD: member
    olcMemberOfMemberOfAD: memberOf
  2. 以下の内容で refint1.ldif を作成します。

    dn: cn=module{0},cn=config
    add: olcmoduleload
    olcmoduleload: refint
  3. 以下の内容で refint2.ldif を作成します。

    dn: olcOverlay=refint,olcDatabase={2}hdb,cn=config
    objectClass: olcConfig
    objectClass: olcOverlayConfig
    objectClass: olcRefintConfig
    objectClass: top
    olcOverlay: refint
    olcRefintAttribute: memberof member manager owner
  4. 3 つの構成ファイルをすべて読み込みます。

    ldapadd -Q -Y EXTERNAL -H ldapi:/// -f memberof_config.ldif
    ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f refint1.ldif
    ldapadd -Q -Y EXTERNAL -H ldapi:/// -f refint2.ldif

1.2 テスト用ユーザーの作成

以下の内容で kafka-users.ldif を作成し、ldapadd を実行してユーザーを追加します。

dn: uid=kafka-user1,ou=people,o=emr
cn: kafka-user1
sn: kafka-user1
uid: kafka-user1
objectClass: inetOrgPerson
userPassword: kafka-secret

dn: uid=kafka-user2,ou=people,o=emr
cn: kafka-user2
sn: kafka-user2
uid: kafka-user2
objectClass: inetOrgPerson
userPassword: kafka-secret

1.3 テスト用ユーザーグループの作成

以下の内容で kafka-groups.ldif を作成し、ldapadd を実行してグループを追加します。

dn: cn=kafka-group1,ou=groups,o=emr
cn: kafka-group1
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr
member: uid=kafka-user2,ou=people,o=emr

dn: cn=kafka-group2,ou=groups,o=emr
cn: kafka-group2
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr

グループメンバーシップのまとめ:

  • kafka-group1:kafka-user1、kafka-user2

  • kafka-group2:kafka-user1

1.4 memberOf 属性の確認

OpenLDAP サーバー上で以下のコマンドを実行し、memberOf 属性が正しく設定されていることを確認します。

ldapsearch -Q -Y EXTERNAL -H ldapi:/// -b ou=people,o=emr memberOf

期待される結果:

  • kafka-user1 は kafka-group1 および kafka-group2 に所属しています。

  • kafka-user2 は kafka-group2 に所属しています。

ステップ 2:クライアント構成ファイルの作成

各クライアント構成ファイルは、特定のユーザーの認証情報を指定します。すべての例では、セキュリティプロトコルとして SASL_PLAINTEXT を使用します。

本番環境では、認証情報をプレーンテキストで送信されないようにするために、SASL_SSLSASL_PLAINTEXT の代わりに使用します。詳細については、「Apache Kafka セキュリティドキュメント」をご参照ください。

以下の 3 つの構成ファイルを作成します。各ファイル内のユーザー名およびパスワードは、クライアントに応じて置き換えてください。

client.properties(スーパーユーザー):

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret";

kafka-user1.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user1" password="kafka-secret";

kafka-user2.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user2" password="kafka-secret";

ステップ 3:テスト用トピックの作成

kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --create --topic test --replication-factor 3 --partitions 2

ステップ 4:権限の付与

Kafka アクセス制御リスト (ACL) は、次の形式に従います。「Principal P が [許可|拒否] Operation O を Host H から Resource R に対して実行する」。ACL の完全なドキュメントについては、「Apache Kafka の認可および ACL」をご参照ください。

リソースに ACL が設定されていない場合、Kafka はデフォルトでそのリソースへのアクセスを制限します。スーパーユーザーのみがアクセスできます。

以下のコマンドを実行して権限を設定します。

# kafka-group2 がクラスターおよびトピック test に対してすべての操作を実行できるように許可します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --cluster kafka-cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --topic test
# kafka-group1 がトピック test に対してすべての操作を実行できないように拒否します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-group1 --deny-host "*" --operation All --topic test
# kafka-user1 がトピック test に対して読み取りアクセスできないように拒否します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-user1 --deny-host "*" --operation Read --topic test
# kafka-user2 がトピック test に対して読み取りアクセスできるように許可します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation Read --topic test

テストトピックの現在のACLを表示するには:

kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --list --topic test

期待される出力:

Current ACLs for resource `Topic:LITERAL:test`:
        User:kafka-group2 has Allow permission for operations: All from hosts: *
        User:kafka-user2 has Allow permission for operations: Read from hosts: *
        User:kafka-user1 has Deny permission for operations: Read from hosts: *
        User:kafka-group1 has Deny permission for operations: All from hosts: *

ステップ 5:権限の検証

以下のテストにより、グループベースの権限が正しく適用されていることを確認します。ユーザーの有効な権限は、個別の ACL と所属するすべてのグループの ACL を組み合わせたものです。

kafka-user1 はテスト用トピックに書き込み可能です(kafka-group2 から「Allow All」を継承):

kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user1.properties --topic test

メッセージを数件書き込んで検証します。

>a
>b
>c
>d

kafka-user1 はテスト用トピックから読み取りできません(明示的な「Deny Read」が設定済み):

# まずコンシューマーグループへの権限を付与します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user1 --allow-host "*" --operation All --group kka-user1-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user1.properties --topic test --group kafka-user1-consumer

読み取り操作は失敗し、kafka-user1 が kafka-group2 に所属しているにもかかわらず、明示的な「Deny Read」が優先されることを確認できます。

kafka-user2 はテスト用トピックに書き込みできません(書き込み権限が付与されていません):

kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user2.properties --topic test

書き込み操作は失敗します。

kafka-user2 はテスト用トピックから読み取り可能です(「Allow Read」が設定済み):

# まずコンシューマーグループへの権限を付与します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation All --group kafka-user2-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user2.properties --topic test --group kafka-user2-consumer --from-beginning

データが正常に消費されます。