This topic explains how to configure LDAP-based authentication for E-MapReduce (EMR) Kafka and walks through a complete end-to-end example using open source Kafka 2.4.1.
Prerequisites
Before you begin, ensure that you have:
-
A Dataflow cluster with the Kafka and OpenLDAP services selected. See Create a Dataflow Kafka cluster.
-
A Kafka cluster running a minor version later than EMR V3.44.0 or EMR V5.10.0.
-
The EMR OpenLDAP service (or an external LDAP service) deployed in the cluster. This topic uses the EMR OpenLDAP service.
Usage notes
To configure user group authentication, the memberOf overlay feature must be enabled for the LDAP service, or the memberOf attribute must be configurable for individual LDAP users.
Configure an LDAP user for authentication
Step 1: Create a superuser
Skip this step if you already have a Kafka superuser.
A Kafka superuser has access to all Kafka resources. In this setup, the superuser is used to access broker nodes and Kafka components. This step adds a Kafka superuser to the EMR OpenLDAP service.
-
Connect to the master-1-1 node of the cluster over SSH. See Log on to a cluster.
-
Create a file named
kafka.ldifwith the following content. This adds an LDAP user with UIDkafkaand passwordkafka-secret.dn: uid=kafka,ou=people,o=emr cn: kafka sn: kafka objectClass: inetOrgPerson userPassword: kafka-secret uid: kafka -
Run the following command to add the LDAP user:
ldapadd -H ldap://master-1-1:10389 -f kafka.ldif -D ${uid} -w ${rootDnPW}Replace the placeholders as follows:
Placeholder Description Where to find it ${uid}Value of the admin_dn parameter Configure tab of the OpenLDAP service page in the EMR console ${rootDnPW}Value of the admin_pwd parameter Configure tab of the OpenLDAP service page in the EMR console 10389is the default listening port of the OpenLDAP service.To verify that the user was added, run:
ldapsearch -w ${rootDnPW} -D ${uid} -H ldap://master-1-1:10389 -b uid=kafka,ou=people,o=emr
Step 2: Go to the Kafka service configuration page
-
Log on to the EMR on ECS console.
-
In the top navigation bar, select the region where your cluster resides and select a resource group.
-
On the EMR on ECS page, find your cluster and click Services in the Actions column.
-
On the Services tab, find Kafka and click Configure.
Step 3: Update existing configuration items
-
On the Configure tab, click the server.properties tab.
-
Set kafka.ssl.config.type to CUSTOM.
-
Set authorizer.class.name to kafka.security.ldap.authorizer.SimpleLdapAuthorizer.
-
Click Save. In the dialog box, set the Execution Reason and turn on Automatically Update Configurations.
Step 4: Add new configuration items
-
On the server.properties tab, click Add Configuration Item.
-
In the Add Configuration Item dialog box, add the following items:
Configuration item Value Description super.usersUser:kafkaThe name of the superuser you created in Step 1. Replace kafkawith your superuser name.listener.name.${listener}.sasl.enabled.mechanismsPLAINReplace ${listener}with your listener name, for example,sasl_ssl.listener.name.${listener}.plain.sasl.jaas.configSee below Replace ${listener}with your listener name. Update the LDAP-related values based on your configuration.listener.name.${listener}.plain.sasl.server.callback.handler.classkafka.security.ldap.authenticator.LdapAuthenticateCallbackHandlerReplace ${listener}with your listener name.sasl.mechanism.inter.broker.protocolPLAINsasl.enabled.mechanismsPLAINFor the
listener.name.${listener}.plain.sasl.jaas.configitem, use the following value and replace each option based on your LDAP configuration:org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret" emr.kafka.security.ldap.user.name.attribute="uid" emr.kafka.security.ldap.user.base.dn="ou=people,o=emr" emr.kafka.security.ldap.group.name.attribute="cn" emr.kafka.security.ldap.group.base.dn="ou=groups,o=emr" emr.kafka.security.ldap.admin.name.attribute="uid" emr.kafka.security.ldap.admin.base.dn="o=emr" emr.kafka.security.ldap.url="ldaps://master-1-1:10636" emr.kafka.security.ldap.bind.user="admin" emr.kafka.security.ldap.bind.user.password="WMMuhh3P**********" emr.kafka.security.ldap.user.member.of.attribute="memberOf" emr.kafka.security.ldap.group.authorization.support="true" ;The following table describes the LDAP-related options:
Option Description usernameName of the superuser. Kafka uses this account to access broker nodes. emr.kafka.security.ldap.user.name.attributeLDAP attribute used to identify a user's name (used to look up the username). emr.kafka.security.ldap.user.base.dnBase distinguished name (DN) for LDAP users. emr.kafka.security.ldap.group.name.attributeLDAP attribute used to identify a group's name. emr.kafka.security.ldap.group.base.dnBase DN for LDAP user groups. emr.kafka.security.ldap.admin.name.attributeLDAP attribute used to identify the admin user's name. emr.kafka.security.ldap.admin.base.dnBase DN for the LDAP admin user. emr.kafka.security.ldap.urlURL of the OpenLDAP service. emr.kafka.security.ldap.bind.userLDAP administrator username, used for user group authentication. emr.kafka.security.ldap.bind.user.passwordPassword for the LDAP administrator. emr.kafka.security.ldap.user.member.of.attributeLDAP attribute that records which groups a user belongs to. emr.kafka.security.ldap.group.authorization.supportSet to trueto enable group-based authorization. When enabled, users inherit permissions from the groups they belong to. -
Update the following configuration files based on your requirements:
Replace the username and password in
kafka.client.jaas.contentwith your actual credentials. Use the superuser credentials for broker-level access.Configuration file Parameter Value Note kafka_client_jaas.conf kafka.client.jaas.contentKafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; };The value must end with a semicolon ( ;).schema-registry.properties schema_registry_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.confIf a value already exists, append this to the end. kafka-rest.properties kafkarest_opts-Djava.security.auth.login.config=/etc/taihao-apps/kafka-conf/kafka-conf/kafka_client_jaas.confIf a value already exists, append this to the end.
Step 5: Restart the Kafka service
-
On the Configure tab, choose More > Restart.
-
In the dialog box, set the Execution Reason and click OK.
-
In the Confirm dialog, click OK.
Production security considerations
The example in the next section uses SASL_PLAINTEXT, which transmits credentials in plaintext. In a production environment:
-
Use
SASL_SSLinstead ofSASL_PLAINTEXTto encrypt credentials in transit. -
Avoid storing passwords in plaintext configuration files where possible. The LDAP bind password in
sasl.jaas.configrequires protection.
Example
The following example uses open source Kafka 2.4.1. For other Kafka versions, see the Apache Kafka security documentation.
The example demonstrates group-based authorization using LDAP user groups and ACLs.
Step 1: Create test users and user groups
1.1 Enable the memberOf overlay (if not already enabled)
If the memberOf overlay feature is disabled for the OpenLDAP service, enable it before creating user groups.
The steps to enable the memberOf overlay vary by LDAP service. The following steps apply to the EMR OpenLDAP service.
Enable the feature on all nodes where the OpenLDAP service is deployed.
Adjust the following parameters based on your environment:
olcModulepath: For a 32-bit operating system, set this to/usr/lib/openldap.
dn: cn=module{0},cn=config: Ifcn=module{0}.ldifalready exists in/etc/openldap/slapd.d/cn=config, change0to1. Ifcn=module{1}.ldifalready exists, change1to2, and so on.
dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config: SetolcDatabasebased on the value in/etc/openldap/slapd.d/cn=config. For example, if the file is{2}hdb.ldif, setolcDatabasetohdb.
-
Create
memberof_config.ldifwith the following content:dn: cn=module{0},cn=config cn: module{0} objectClass: olcModuleList objectclass: top olcModuleload: memberof.la olcModulePath: /usr/lib64/openldap dn: olcOverlay={0}memberof,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcMemberOf objectClass: olcOverlayConfig objectClass: top olcOverlay: memberof olcMemberOfDangling: ignore olcMemberOfRefInt: TRUE olcMemberOfGroupOC: groupOfNames olcMemberOfMemberAD: member olcMemberOfMemberOfAD: memberOf -
Create
refint1.ldifwith the following content:dn: cn=module{0},cn=config add: olcmoduleload olcmoduleload: refint -
Create
refint2.ldifwith the following content:dn: olcOverlay=refint,olcDatabase={2}hdb,cn=config objectClass: olcConfig objectClass: olcOverlayConfig objectClass: olcRefintConfig objectClass: top olcOverlay: refint olcRefintAttribute: memberof member manager owner -
Load all three configuration files:
ldapadd -Q -Y EXTERNAL -H ldapi:/// -f memberof_config.ldif ldapmodify -Q -Y EXTERNAL -H ldapi:/// -f refint1.ldif ldapadd -Q -Y EXTERNAL -H ldapi:/// -f refint2.ldif
1.2 Create test users
Create kafka-users.ldif with the following content and run ldapadd to add the users:
dn: uid=kafka-user1,ou=people,o=emr
cn: kafka-user1
sn: kafka-user1
uid: kafka-user1
objectClass: inetOrgPerson
userPassword: kafka-secret
dn: uid=kafka-user2,ou=people,o=emr
cn: kafka-user2
sn: kafka-user2
uid: kafka-user2
objectClass: inetOrgPerson
userPassword: kafka-secret
1.3 Create test user groups
Create kafka-groups.ldif with the following content and run ldapadd to add the groups:
dn: cn=kafka-group1,ou=groups,o=emr
cn: kafka-group1
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr
member: uid=kafka-user2,ou=people,o=emr
dn: cn=kafka-group2,ou=groups,o=emr
cn: kafka-group2
objectClass: groupOfNames
member: uid=kafka-user1,ou=people,o=emr
Group membership summary:
-
kafka-group1: kafka-user1, kafka-user2
-
kafka-group2: kafka-user1
1.4 Verify the memberOf attribute
Run the following command on the OpenLDAP server to confirm the memberOf attribute is populated:
ldapsearch -Q -Y EXTERNAL -H ldapi:/// -b ou=people,o=emr memberOf
Expected result:
-
kafka-user1belongs to kafka-group1 and kafka-group2. -
kafka-user2belongs to kafka-group2.
Step 2: Create client configuration files
Each client configuration file specifies the credentials for a particular user. All examples use SASL_PLAINTEXT as the security protocol.
In a production environment, useSASL_SSLinstead ofSASL_PLAINTEXTto prevent credentials from being transmitted in plaintext. See the Apache Kafka security documentation for details.
Create the following three configuration files. Replace the username and password in each file based on the client.
client.properties (superuser):
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret";
kafka-user1.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user1" password="kafka-secret";
kafka-user2.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user2" password="kafka-secret";
Step 3: Create a test topic
kafka-topics.sh --bootstrap-server core-1-1:9092 --command-config client.properties --create --topic test --replication-factor 3 --partitions 2
Step 4: Grant permissions
Kafka access control lists (ACLs) follow the format: "Principal P is [Allowed|Denied] Operation O from Host H on Resource R." For full ACL documentation, see Apache Kafka authorization and ACLs.
If a resource has no ACLs, Kafka restricts access to that resource by default. Only superusers can access it.
Run the following commands to set up permissions:
# Allow kafka-group2 to perform all operations on the cluster and topic test.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --cluster kafka-cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-group2 --allow-host "*" --operation All --topic test
# Deny kafka-group1 all operations on topic test.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-group1 --deny-host "*" --operation All --topic test
# Deny kafka-user1 read access on topic test.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --deny-principal User:kafka-user1 --deny-host "*" --operation Read --topic test
# Allow kafka-user2 read access on topic test.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation Read --topic test
To view the current ACLs for the test topic:
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --list --topic test
Expected output:
Current ACLs for resource `Topic:LITERAL:test`:
User:kafka-group2 has Allow permission for operations: All from hosts: *
User:kafka-user2 has Allow permission for operations: Read from hosts: *
User:kafka-user1 has Deny permission for operations: Read from hosts: *
User:kafka-group1 has Deny permission for operations: All from hosts: *
Step 5: Verify permissions
The following tests confirm that group-based permissions are applied correctly. A user's effective permissions are the combination of their individual ACLs and the ACLs of all groups they belong to.
kafka-user1 can write to the test topic (inherits Allow All from kafka-group2):
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user1.properties --topic test
Write a few messages to verify:
>a
>b
>c
>d
kafka-user1 cannot read from the test topic (has an explicit Deny Read):
# Grant the consumer group permission first.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user1 --allow-host "*" --operation All --group kka-user1-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user1.properties --topic test --group kafka-user1-consumer
The read operation fails, confirming that the explicit Deny Read takes effect even though kafka-user1 belongs to kafka-group2.
kafka-user2 cannot write to the test topic (no write permission granted):
kafka-console-producer.sh --broker-list core-1-1:9092 --producer.config ./kafka-user2.properties --topic test
The write operation fails.
kafka-user2 can read from the test topic (has Allow Read):
# Grant the consumer group permission first.
kafka-acls.sh --authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER --add --allow-principal User:kafka-user2 --allow-host "*" --operation All --group kafka-user2-consumer
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --consumer.config ./kafka-user2.properties --topic test --group kafka-user2-consumer --from-beginning
Data is consumed successfully.