このトピックでは、E-MapReduce (EMR) Kafka における LDAP を使用した認証の設定方法を説明し、オープンソース Kafka 2.4.1 を用いたエンドツーエンドの完全な手順例を紹介します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
-
Kafka および OpenLDAP サービスが選択された Dataflow クラスター。詳細については、「Dataflow Kafka クラスターの作成」をご参照ください。
-
EMR V3.44.0 より後のマイナーバージョン、または EMR V5.10.0 より後のマイナーバージョンで実行中の Kafka クラスター。
-
クラスター内にデプロイされた EMR OpenLDAP サービス(または外部 LDAP サービス)。本トピックでは EMR OpenLDAP サービスを使用します。
注意事項
ユーザーグループ認証を設定するには、LDAP サービスで memberOf オーバーレイ機能が有効化されているか、個々の LDAP ユーザーに対して memberOf 属性が設定可能である必要があります。
LDAP ユーザーによる認証の設定
ステップ 1:スーパーユーザーの作成
既に Kafka スーパーユーザーをお持ちの場合は、このステップはスキップしてください。
Kafka スーパーユーザーは、すべての Kafka リソースへのアクセス権限を持ちます。本設定では、ブローカーノードおよび Kafka コンポーネントへのアクセスにスーパーユーザーを使用します。このステップでは、EMR OpenLDAP サービスに Kafka スーパーユーザーを追加します。
-
クラスターの master-1-1 ノードに SSH で接続します。詳細については、「クラスターにログオンする」をご参照ください。
-
以下の内容で
kafka.ldifというファイルを作成します。これにより、UID がkafka、パスワードがkafka-secretの LDAP ユーザーが追加されます。dn: uid=kafka,ou=people,o=emr cn: kafka sn: kafka objectClass: inetOrgPerson userPassword: kafka-secret uid: kafka -
以下のコマンドを実行して LDAP ユーザーを追加します。
ldapadd -H ldap://master-1-1:10389 -f kafka.ldif -D ${uid} -w ${rootDnPW}プレースホルダーを以下のように置き換えます。
プレースホルダー 説明 確認方法 ${uid}admin_dn パラメーターの値 構成 タブ ${rootDnPW}admin_pwd パラメーターの値 [設定] タブ(EMR コンソール内の OpenLDAP サービスページ) 10389は OpenLDAP サービスのデフォルトリスニングポートです。ユーザーが正常に追加されたことを確認するには、以下のコマンドを実行します。
ldapsearch -w ${rootDnPW} -D ${uid} -H ldap://master-1-1:10389 -b uid=kafka,ou=people,o=emr
ステップ 2:Kafka サービスの構成ページへ移動
-
ECS 上の EMR コンソール にログインします。
-
上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、リソースグループを選択します。
-
ECS 上の EMR ページで、対象のクラスターを見つけ、[操作] 列の サービス をクリックします。
-
サービス タブで Kafka を見つけ、構成 をクリックします。
ステップ 3:既存の設定項目の更新
-
構成 タブで、server.properties タブをクリックします。
-
kafka.ssl.config.type を CUSTOM に設定します。
-
authorizer.class.name を kafka.security.ldap.authorizer.SimpleLdapAuthorizer に設定します。
-
保存 をクリックします。ダイアログボックスで 実行理由 を入力し、構成を自動的に更新 を有効化します。
ステップ 4:新しい設定項目の追加
-
server.properties タブで、設定項目の追加 をクリックします。
-
設定項目の追加 ダイアログボックスで、以下の項目を追加します。
設定項目 値 説明 super.usersUser:kafkaステップ 1 で作成したスーパーユーザー名。ここにある kafkaを実際のスーパーユーザー名に置き換えてください。listener.name.${listener}.sasl.enabled.mechanismsPLAIN${listener}を実際のリスナー名(例:sasl_ssl)に置き換えてください。listener.name.${listener}.plain.sasl.jaas.config下記を参照 ${listener}を実際のリスナー名に置き換えてください。LDAP 関連の値は、ご自身の構成に基づいて更新してください。listener.name.${listener}.plain.sasl.server.callback.handler.classkafka.security.ldap.authenticator.LdapAuthenticateCallbackHandler${listener}を実際のリスナー名に置き換えてください。sasl.mechanism.inter.broker.protocolPLAINsasl.enabled.mechanismsPLAINlistener.name.${listener}.plain.sasl.jaas.configの値として、以下の内容を使用し、各オプションを LDAP 構成に基づいて適宜置き換えてください。org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret" emr.kafka.security.ldap.user.name.attribute="uid" emr.kafka.security.ldap.user.base.dn="ou=people,o=emr" emr.kafka.security.ldap.group.name.attribute="cn" emr.kafka.security.ldap.group.base.dn="ou=groups,o=emr" emr.kafka.security.ldap.admin.name.attribute="uid" emr.kafka.security.ldap.admin.base.dn="o=emr" emr.kafka.security.ldap.url="ldaps://master-1-1:10636" emr.kafka.security.ldap.bind.user="admin" emr.kafka.security.ldap.bind.user.password="WMMuhh3P**********" emr.kafka.security.ldap.user.member.of.attribute="memberOf" emr.kafka.security.ldap.group.authorization.support="true" ;以下の表では、LDAP 関連のオプションについて説明します。
オプション 説明 usernameスーパーユーザー名。Kafka はこのアカウントを使用してブローカーノードにアクセスします。 emr.kafka.security.ldap.user.name.attributeユーザー名を識別するための LDAP 属性(ユーザー名の検索に使用)。 emr.kafka.security.ldap.user.base.dnLDAP ユーザーのベース識別名(DN)。 emr.kafka.security.ldap.group.name.attributeグループ名を識別するための LDAP 属性。 emr.kafka.security.ldap.group.base.dnLDAP ユーザーグループのベース DN。 emr.kafka.security.ldap.admin.name.attribute管理者ユーザー名を識別するための LDAP 属性。 emr.kafka.security.ldap.admin.base.dnLDAP 管理者ユーザーのベース DN。 emr.kafka.security.ldap.urlOpenLDAP サービスの URL。 emr.kafka.security.ldap.bind.userユーザーグループ認証に使用される LDAP 管理者ユーザー名。 emr.kafka.security.ldap.bind.user.passwordLDAP 管理者のパスワード。 emr.kafka.security.ldap.user.member.of.attributeユーザーが所属するグループを記録する LDAP 属性。 emr.kafka.security.ldap.group.authorization.supporttrueを設定すると、グループベースの権限付与が有効化されます。有効化すると、ユーザーは所属するグループから権限を継承します。 -
要件に応じて、以下の構成ファイルを更新します。
kafka.client.jaas.content内のユーザー名およびパスワードを、実際の認証情報に置き換えてください。ブローカーレベルのアクセスにはスーパーユーザーの認証情報を使用します。構成ファイル パラメーター 値 備考 kafka_client_jaas.conf kafka.client.jaas.contentKafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; };値はセミコロン ( ;) で終了する必要があります。schema-registry.properties schema_registry_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf既に値が存在する場合は、末尾に追加してください。 kafka-rest.properties kafkarest_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.conf既に値が存在する場合は、末尾に追加してください。
ステップ 5:Kafka サービスの再起動
-
構成 タブで、その他 > 再起動 を選択します。
-
ダイアログボックスで 実行理由 を入力し、OK をクリックします。
-
確認 ダイアログで、OK をクリックします。
本番環境に適用する際のセキュリティ上の考慮事項
次のセクションの手順例では、認証情報をプレーンテキストで送信する SASL_PLAINTEXT を使用しています。本番環境では、以下の点に注意してください。
-
通信中の認証情報の暗号化を行うため、
SASL_SSLを使用し、SASL_PLAINTEXTを避けてください。 -
可能な限り、平文の構成ファイルにパスワードを格納しないでください。
sasl.jaas.config内の LDAP bind パスワードは、適切な保護が必要です。
例
次の例では、オープンソースの Kafka 2.4.1 を使用しています。他の Kafka バージョンについては、「Apache Kafka セキュリティドキュメント」をご参照ください。
この手順例では、LDAP ユーザーグループおよび ACL を使用したグループベースの権限付与を実証します。
ステップ 1:テスト用ユーザーおよびユーザーグループの作成
1.1 memberOf オーバーレイの有効化(未有効の場合)
OpenLDAP サービスで memberOf オーバーレイ機能が無効になっている場合、ユーザーグループの作成前に有効化する必要があります。
memberOf オーバーレイの有効化手順は、LDAP サービスによって異なります。以下の手順は EMR OpenLDAP サービスに適用されます。
OpenLDAP サービスがデプロイされている すべてのノード で、この機能を有効化してください。
以下のパラメーターは、ご自身の環境に応じて調整してください。
olcModulepath:32 ビット OS の場合、/usr/lib/openldapに設定します。
dn: cn=module{0},cn=config:cn=module{0}.ldifが/etc/openldap/slapd.d/cn=configにすでに存在する場合、0を1に変更します。同様に、cn=module{1}.ldifが存在する場合は、1を2に変更します。
dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config:olcDatabaseは/etc/openldap/slapd.d/cn=configの値に基づいて設定します。たとえば、ファイル名が{2}hdb.ldifの場合、olcDatabaseをhdbに設定します。
-
以下の内容で
memberof_config.ldifを作成します。dn: cn=module{0},cn=config cn: module{0} objectClass: olcModuleList objectclass: top olcModuleload: memberof.la olcModulePath: /usr/lib64/openldap dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcMemberOf objectClass: olcOverlayConfig objectClass: top olcOverlay: memberof olcMemberOfDangling: ignore olcMemberOfRefInt: TRUE olcMemberOfGroupOC: groupOfNames olcMemberOfMemberAD: member olcMemberOfMemberOfAD: memberOf -
以下の内容で
refint1.ldifを作成します。dn: cn=module{0},cn=config add: olcmoduleload olcmoduleload: refint -
以下の内容で
refint2.ldifを作成します。dn: olcOverlay=refint,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcOverlayConfig objectClass: olcRefintConfig objectClass: top olcOverlay: refint olcRefintAttribute: memberof member manager owner -
3 つの構成ファイルをすべて読み込みます。
ldapadd -Q -Y EXTERNAL -H ldapi:/// -f memberof_config.ldif ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f refint1.ldif ldapadd -Q -Y EXTERNAL -H ldapi:/// -f refint2.ldif
1.2 テスト用ユーザーの作成
以下の内容で kafka-users.ldif を作成し、ldapadd を実行してユーザーを追加します。
dn: uid=kafka-user1,ou=people,o=emr
cn: kafka-user1
sn: kafka-user1
uid: kafka-user1
objectClass: inetOrgPerson
userPassword: kafka-secret
dn: uid=kafka-user2,ou=people,o=emr
cn: kafka-user2
sn: kafka-user2
uid: kafka-user2
objectClass: inetOrgPerson
userPassword: kafka-secret
1.3 テスト用ユーザーグループの作成
以下の内容で kafka-groups.ldif を作成し、ldapadd を実行してグループを追加します。
dn: cn=kafka-group1,ou=groups,o=emr
cn: kafka-group1
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr
member: uid=kafka-user2,ou=people,o=emr
dn: cn=kafka-group2,ou=groups,o=emr
cn: kafka-group2
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr
グループメンバーシップのまとめ:
-
kafka-group1:kafka-user1、kafka-user2
-
kafka-group2:kafka-user1
1.4 memberOf 属性の確認
OpenLDAP サーバー上で以下のコマンドを実行し、memberOf 属性が正しく設定されていることを確認します。
ldapsearch -Q -Y EXTERNAL -H ldapi:/// -b ou=people,o=emr memberOf
期待される結果:
-
kafka-user1は kafka-group1 および kafka-group2 に所属しています。 -
kafka-user2は kafka-group2 に所属しています。
ステップ 2:クライアント構成ファイルの作成
各クライアント構成ファイルは、特定のユーザーの認証情報を指定します。すべての例では、セキュリティプロトコルとして SASL_PLAINTEXT を使用します。
本番環境では、認証情報をプレーンテキストで送信されないようにするために、SASL_SSLをSASL_PLAINTEXTの代わりに使用します。詳細については、「Apache Kafka セキュリティドキュメント」をご参照ください。
以下の 3 つの構成ファイルを作成します。各ファイル内のユーザー名およびパスワードは、クライアントに応じて置き換えてください。
client.properties(スーパーユーザー):
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret";
kafka-user1.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user1" password="kafka-secret";
kafka-user2.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user2" password="kafka-secret";
ステップ 3:テスト用トピックの作成
kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --create --topic test --replication-factor 3 --partitions 2
ステップ 4:権限の付与
Kafka アクセス制御リスト (ACL) は、次の形式に従います。「Principal P が [許可|拒否] Operation O を Host H から Resource R に対して実行する」。ACL の完全なドキュメントについては、「Apache Kafka の認可および ACL」をご参照ください。
リソースに ACL が設定されていない場合、Kafka はデフォルトでそのリソースへのアクセスを制限します。スーパーユーザーのみがアクセスできます。
以下のコマンドを実行して権限を設定します。
# kafka-group2 がクラスターおよびトピック test に対してすべての操作を実行できるように許可します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --cluster kafka-cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --topic test
# kafka-group1 がトピック test に対してすべての操作を実行できないように拒否します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-group1 --deny-host "*" --operation All --topic test
# kafka-user1 がトピック test に対して読み取りアクセスできないように拒否します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-user1 --deny-host "*" --operation Read --topic test
# kafka-user2 がトピック test に対して読み取りアクセスできるように許可します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation Read --topic test
テストトピックの現在のACLを表示するには:
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --list --topic test
期待される出力:
Current ACLs for resource `Topic:LITERAL:test`:
User:kafka-group2 has Allow permission for operations: All from hosts: *
User:kafka-user2 has Allow permission for operations: Read from hosts: *
User:kafka-user1 has Deny permission for operations: Read from hosts: *
User:kafka-group1 has Deny permission for operations: All from hosts: *
ステップ 5:権限の検証
以下のテストにより、グループベースの権限が正しく適用されていることを確認します。ユーザーの有効な権限は、個別の ACL と所属するすべてのグループの ACL を組み合わせたものです。
kafka-user1 はテスト用トピックに書き込み可能です(kafka-group2 から「Allow All」を継承):
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user1.properties --topic test
メッセージを数件書き込んで検証します。
>a
>b
>c
>d
kafka-user1 はテスト用トピックから読み取りできません(明示的な「Deny Read」が設定済み):
# まずコンシューマーグループへの権限を付与します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user1 --allow-host "*" --operation All --group kka-user1-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user1.properties --topic test --group kafka-user1-consumer
読み取り操作は失敗し、kafka-user1 が kafka-group2 に所属しているにもかかわらず、明示的な「Deny Read」が優先されることを確認できます。
kafka-user2 はテスト用トピックに書き込みできません(書き込み権限が付与されていません):
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user2.properties --topic test
書き込み操作は失敗します。
kafka-user2 はテスト用トピックから読み取り可能です(「Allow Read」が設定済み):
# まずコンシューマーグループへの権限を付与します。
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation All --group kafka-user2-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user2.properties --topic test --group kafka-user2-consumer --from-beginning
データが正常に消費されます。