If Kafka authentication (for example, Kerberos authentication or another simple authorization based on a user name and password) is disabled, users can access services with forged identities, even if Kafka authorization is enabled. Therefore, we recommend that you create a high-security Kafka cluster. For more information, see Introduction to Kerberos.

Note The permission configurations detailed in this section are for high-security E-MapReduce clusters only (Kafka is started in Kerberos).

Add configurations

  1. On the Cluster Management page, click View Details next to the Kafka cluster.
  2. In the navigation pane on the left, click the Clusters and Services tab, and click Kafka in the service list.
  3. At the top of the page, click the Configuration tab.
  4. In the upper-right corner of the Service Configuration list, click Custom Configuration and add the following parameters:
Key Value Description
authorizer.class.name kafka.security.auth.SimpleAclAuthorizer N/A
super.users User:kafka User:kafka is required. Other users can be added and separated by semicolons (;).
Note zookeeper.set.acl is used to set the permissions for Kafka to operate data in ZooKeeper. It is already set to true in the E-MapReduce cluster, so you do not need to add this configuration here. With the configuration set to true, only users named Kafka who have passed the Kerberos authentication can run the kafka-topics.sh command in the Kerberos environment. Kafka-topics.sh can read, write, and modify data in ZooKeeper.

Restart a Kafka cluster

  1. On the Cluster Management page, click View Details next to the Kafka cluster you want to operate in the Operation column.
  2. In the navigation pane on the left, click the Clusters and Services tab, and click Actions to the right of Kafka on the service list.
  3. In the drop-down menu, select RESTART All Components. Enter the record information and click OK.

Authorization (ACL)

  • Basic concepts

    Definition in official Kafka documentation:

    Kafka ACLs are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"

    This indicates that the ACL process relates to Principal, Allowed/Denied, Operation Host, and Resource.

    • Principal: username
      Security protocol Value
      PLAINTEXT ANONYMOUS
      SSL ANONYMOUS
      SASL_PLAINTEXT If the mechanism is PLAIN, the user name is specified by client_jaas.conf. If the mechanism is GSSAPI, the user name is principal specified by client_jaas.conf.
      SASL_SSL
    • Allowed/Denied
    • Operation: Operations include Read, Write, Create, DeleteAlter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, and All.
    • Host: The target machine.
    • Resource: Resource objects, including Topic, Group, Cluster, and TransactionalId.

    For detailed mapping relationships between operations and resources, such as the supporting relationships between resources and the authorization of operations, see KIP-11 - Authorization Interface.

  • Authorization command

    Perform authorization using the kafka-acls.sh script (/usr/lib/kafka-current/bin/kafka-acls.sh). For more information about how to use this script to authorize Kafka, run the kafka-acls.sh --help command.

Procedure

Complete the following operations on the master node of the high-security Kafka cluster you created in E-MapReduce.

  1. Create a user named test.

    useradd test

  2. Create a topic.

    zookeeper.set.acl is set to true, and kafka-topics.sh must be run under a Kafka account. The Kafka account must pass Kerberos authentication.

    #The Kerberos authentication information related to the kafka account is set in kafka_client_jaas.conf.
    export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
    # Change the ZooKeeper address to the actual address (run hostnamed to acquire) of your Kafka cluster.
    kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test
  3. Run kafka-console-producer.sh with the test user.
    1. Create a keytab file for the test user to authenticate ZooKeeper and Kafka.
      su root
      sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab
      HadminLocalTool.local: # Press Enter to display the usage instructions on some commands.
      HadminLocalTool.local: addprinc # Enter a command and press Enter to display the usage instructions on the command.
      HadminLocalTool.local: addprinc -pw 123456 test # Add a principal for the test user and set the password to 123456.
      HadminLocalTool.local: ktadd -k /home/test/test.keytab test # Export the keytab file for later use.
    2. Add a kafka_client_test.conf file.

      Put the file in /home/test/kafka_client_test.conf. The content of the file is as follows:

      KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      serviceName="kafka"
      keyTab="/home/test/test.keytab"
      principal="test";
      };
      // Zookeeper client authentication
      Client {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      useTicketCache=false
      serviceName="zookeeper"
      keyTab="/home/test/test.keytab"
      principal="test";
      };
    3. Add producer.conf.

      Put the file in /home/test/producer.conf. The content of the file is as follows:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=GSSAPI
    4. Run kafka-console-producer.sh.
      su test
      export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
      kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

      Because no ACL is set, an error is reported after the preceding command is run:

      org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
    5. Set an ACL.

      Similarly, the kafka-acls.sh command must be run under the Kafka account.

      su kafka
      export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
      kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
    6. Run kafka-console-producer.sh again.
      su test
      export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
      kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092

      Normal output is as follows:

      [2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
      >alibaba
      >E-MapReduce
      >
  4. Run kafka-console-consumer.sh with the test user.

    After kafka-console-producer.sh is successfully run and data is written to the topic, you can run kafka-console-consumer.sh to perform a consumption test.

    1. Add consumer.conf.

      Put the file in /home/test/consumer.conf. The content of the file is as follows:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=GSSAPI
    2. Run kafka-console-consumer.sh.
      su test
      # Kafka_client_test.conf is used in the same way as kafka-console-producer.sh.
      export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
      kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

      Because no permissions are set, an error is reported:

      org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
    3. Set an ACL.
      su kafka
      export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" 
      # test-group permission
      kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group
      # topic permission
      kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
    4. Run kafka-console-consumer.sh again.
      su test
      # Kafka_client_test.conf is used in the same way as kafka-console-producer.sh.
      export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf"
      kafka-console-consumer.sh --consumer.config consumer.conf  --topic test --bootstrap-server emr-worker-1:9092 --group test-group  --from-beginning

      Normal output is as follows:

      alibaba
      E-MapReduce