This topic describes how to use the Kafka Indexing Service plug-in that is provided by Apache Druid in E-MapReduce (EMR) to consume Kafka data in real time.

Prerequisites

A Druid cluster and a Kafka cluster are created in the EMR console. For more information, see Create a cluster.

Background information

Kafka Indexing Service is a plug-in that is provided by Apache Druid. Kafka Indexing Service allows you to use the indexing service of Apache Druid to consume Kafka data in real time. The Kafka Indexing Service plug-in starts a supervisor in an Overlord. After the supervisor is started, indexing tasks are run in MiddleManager. These tasks can connect to the Kafka cluster. This way, data in Kafka topics are consumed and indexes are created. You need to only prepare a file that describes the formats of data to be consumed and start a supervisor by calling the related RESTful API.

Configure the interaction between a Druid cluster and a Kafka cluster

To configure the interaction between a Druid cluster and a Kafka cluster, you must establish a connection between the clusters and add the hosts of the Kafka cluster to the host list of each node in the Druid cluster. The operations are similar to the operations that are performed to configure the interaction between a Druid cluster and a Hadoop cluster.
  • For a non-high-security Kafka cluster, perform the following steps to configure the interaction with a Druid cluster:
    1. Make sure that the two clusters can communicate with each other. This indicates that the two clusters are in the same security group, or the two clusters are in different security groups between which access rules are configured.
    2. Add the hosts of the Kafka cluster to the host list of each node in the Druid cluster.
      Important The hostnames of the Kafka cluster are in the format of a long domain name. Example: emr-header-1.cluster-xxxxxxxx.
  • For a high-security Kafka cluster, perform the following steps to configure the interaction with a Druid cluster:
    1. Make sure that the two clusters can communicate with each other. This indicates that the two clusters are in the same security group, or the two clusters are in different security groups between which access rules are configured.
    2. Add the hosts of the Kafka cluster to the host list of each node in the Druid cluster.
      Important The hostnames of the Kafka cluster are in the format of a long domain name. Example: emr-header-1.cluster-xxxxxxxx.
    3. Implement Kerberos cross-realm trust for the clusters. For more information, see Configure cross-realm trust. We recommend that you implement bidirectional cross-realm trust for the clusters.
    4. Prepare a security configuration file for the Kafka client. Sample file content:
      KafkaClient {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/ecm/druid-conf/druid.keytab"
            principal="druid@EMR.1234.COM";
        };

      Synchronize the configuration file to all nodes of the Druid cluster and place the configuration file in a specific directory. Example: /tmp/kafka/kafka_client_jaas.conf.

    5. On the Configure tab of the Druid service page, click overlord.jvm in the Service Configuration section and add the following information:
      -Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
    6. On the Configure tab of the Druid service page, click middleManager.runtime in the Service Configuration section, set the druid.indexer.runner.javaOpts parameter to Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf, and then configure other Java Virtual Machine (JVM) startup parameters based on your business requirements.
    7. Restart the Druid service.

Use Kafka Indexing Service to consume Kafka data in real time

  1. Run the following command on the Kafka cluster or gateway cluster to create a Kafka topic named metrics:
    -- The Kafka cluster is a high-security Kafka cluster. 
    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
    
    kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181 --partitions 1 --replication-factor 1 --topic metrics

    When you create a Kafka topic, configure the parameters in the preceding command based on your environment configurations. To obtain the path that is required in the --zookeeper parameter, perform the following operations: Log on to the EMR console and go to the Configure tab of the Kafka service page. On the Configure tab, view the value of the zookeeper.connect parameter. If you use a self-managed Kafka cluster, you must configure the --zookeeper parameter based on the actual configurations of the cluster.

  2. Define a data-format description file for the Kafka data source and place the file in a specific directory. In this example, the file is named metrics-kafka.json.
    {
         "type": "kafka",
         "dataSchema": {
             "dataSource": "metrics-kafka",
             "parser": {
                 "type": "string",
                 "parseSpec": {
                     "timestampSpec": {
                         "column": "time",
                         "format": "auto"
                     },
                     "dimensionsSpec": {
                         "dimensions": ["url", "user"]
                     },
                     "format": "json"
                 }
             },
             "granularitySpec": {
                 "type": "uniform",
                 "segmentGranularity": "hour",
                 "queryGranularity": "none"
             },
             "metricsSpec": [{
                     "type": "count",
                     "name": "views"
                 },
                 {
                     "name": "latencyMs",
                     "type": "doubleSum",
                     "fieldName": "latencyMs"
                 }
             ]
         },
         "ioConfig": {
             "topic": "metrics",
             "consumerProperties": {
                 "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092 (bootstrap.servers of your Kafka cluster)",
                 "group.id": "kafka-indexing-service",
                 "security.protocol": "SASL_PLAINTEXT",
                 "sasl.mechanism": "GSSAPI"
             },
             "taskCount": 1,
             "replicas": 1,
             "taskDuration": "PT1H"
         },
         "tuningConfig": {
             "type": "kafka",
             "maxRowsInMemory": "100000"
         }
     }
    Note The ioConfig.consumerProperties.security.protocol and ioConfig.consumerProperties.sasl.mechanism parameters are required only for high-security Kafka clusters.
  3. Run the following command to add a Kafka supervisor:
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

    The --negotiate, -u, -b, and -c parameters are required only for high-security Druid clusters.

  4. Start the console producer client on the Kafka cluster.
    # The Kafka cluster is a high-security Kafka cluster.
    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
    echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/kafka-producer.conf
    
    kafka-console-producer.sh --producer.config /tmp/kafka-producer.conf --broker-list emr-header-1:9092,emr-header-2:9092,emr-header-3:9092 --topic metrics

    The --producer.config /tmp/kafka-producer.conf parameter is required only for high-security Kafka clusters.

  5. Enter the following data records in the command prompt window of the Kafka-console-producer.sh command:
    {"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
    {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
    {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
    You can run the following Python command to generate the timestamps in the preceding code:
    python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
  6. Prepare a query file. In this example, the file is named metrics-search.json.
    {
         "queryType" : "search",
         "dataSource" : "metrics-kafka",
         "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
         "granularity" : "all",
         "searchDimensions": [
             "url",
             "user"
         ],
         "query": {
             "type": "insensitive_contains",
             "value": "bob"
         }
     }
  7. Run the following command on the master node of the Druid cluster:
    curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

    The --negotiate, -u, -b, and -c parameters are required only for high-security Druid clusters.

    The following information is returned:
    [ {
       "timestamp" : "2018-03-06T09:00:00.000Z",
       "result" : [ {
         "dimension" : "user",
         "value" : "bob",
         "count" : 2
       } ]
     } ]