This section describes how to use Apache Druid Kafka Indexing Service in E-MapReduce to ingest Kafka data in real time.

The Kafka Indexing Service is an extension launched by Apache Druid to ingest Kafka data in real time using Apache Druid's indexing service. The extension enables supervisors in Overlord which start some indexing tasks in Middlemanager. These tasks connect to the Kafka cluster to ingest the topic data and complete the index creation. You need to prepare a data ingestion format file and manually start the supervisor through the RESTful API.

Interaction with the Kafka cluster

The first interaction is between the E-MapReduce Druid cluster and the Kafka cluster. The interaction configuration of the two clusters is similar to that of the Hadoop cluster. You have to set the connectivity and hosts. For standard mode Kafka clusters, complete the following steps:
  1. Ensure the communication between clusters. (The two clusters are either in the same security group, or each cluster is associated with a different security group and access rules are configured for these security groups.)
  2. Write the hosts of the Kafka cluster to the hosts list of each node on the E-MapReduce Druid cluster. Note that the hostname of the Kafka cluster should be a long name, such as emr-header-1.cluster-xxxxxxxx.
For high-security mode Kafka clusters, complete the following operations (the first two steps are the same as those for standard mode clusters):
  1. Ensure the communication between the two clusters (The two clusters are in the same security group, or each cluster is associated with a different security group and access rules are configured for these security groups).
  2. Write the hosts of the Kafka cluster to the hosts list of each node on the E-MapReduce Druid cluster. Note that the hostname of the Kafka cluster should be a long name, such as emr-header-1.cluster-xxxxxxxx.
  3. Set Kerberos cross-domain mutual trust between the two clusters. For details, see Cross-region access. Bidirectional mutual trust is recommended.
  4. Prepare a client security configuration file:
    KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          storeKey=true
          keyTab="/etc/ecm/druid-conf/druid.keytab"
          principal="druid@EMR. 1234. COM";
      };

    Synchronize the configuration file to all nodes in the E-MapReduce Druid cluster and place it in a specific directory, such as/tmp/kafka/kafka_client_jaas.conf.

  5. In overlord.jvm of the E-MapReduce Druid configuration page:
    Add Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
  6. Configure the following option in middleManager.runtime on the E-MapReduce Druid configuration page: druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf and other jvm startup parameters.
  7. Restart the E-MapReduce DruidDruid service.

Use Apache Druid's Kafka Indexing Service to ingest Kafka data in real time

  1. Run the following command on the Kafka cluster (or gateway) to create a topic named metrics.
    --If the Kafka high-security mode is enabled:
     export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
     --
     kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics

    You can adjust the parameters based on your needs. The /kafka-1.0.0 section of the - -zookeeper parameter is path, and you can see the value of the zookeeper.connect on the Kafka service Configuration page of the Kafka cluster. If you build your own Kafka cluster, the parmname —zookeeper parameter can be changed according to your actual configuration.

  2. Define the data format description file for the data source. Name it metrics-kafka.json and place it in the current directory (or another directory that you have specified).
    {
         "type": "kafka",
         "dataSchema": {
             "dataSource": "metrics-kafka",
             "parser": {
                 "type": "string",
                 "parseSpec": {
                     "timestampSpec": {
                         "column": "time",
                         "format": "auto"
                     },
                     "dimensionsSpec": {
                         "dimensions": ["url", "user"]
                     },
                     "format": "json"
                 }
             },
             "granularitySpec": {
                 "type": "uniform",
                 "segmentGranularity": "hour",
                 "queryGranularity": "none"
             },
             "metricsSpec": [{
                     "type": "count",
                     "name": "views"
                 },
                 {
                     "name": "latencyMs",
                     "type": "doubleSum",
                     "fieldName": "latencyMs"
                 }
             ]
         },
         "ioConfig": {
             "topic": "metrics",
             "consumerProperties": {
                 "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092 (the bootstrap.servers of your Kafka clusters)",
                 "group.id": "kafka-indexing-service",
                 "security.protocol": "SASL_PLAINTEXT",
                 "sasl.mechanism": "GSSAPI"
             },
             "taskCount": 1,
             replicas: 1
             "taskDuration": "PT1H"
         },
         "tuningConfig": {
             "type": "kafka",
             "maxRowsInMemory": "100000"
         }
     }
    Note ioConfig.consumerProperties.security.protocol and ioConfig.consumerProperties.sasl.mechanism are security-related options and are not required for standard mode Kafka clusters.
  3. Run the following command to add a Kafka supervisor.
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

    The —negotiate, -u, -b, and -coptions are for high-security mode Druid clusters.

  4. Enable a console producer on the Kafka cluster.
    --If the high-security mode of Kafka is enabled:
     export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
     echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf
     --
     Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics
     >

    The —producer.config /tmp/Kafka/producer.confoption is for high-security mode Kafka clusters.

  5. Enter data at the command prompt of kafka_console_producer.
    {"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
     {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
     {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
    The timestamp can be generated with the following Python command:
    python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
  6. Prepare a query file named metrics-search.json.
    {
         "queryType" : "search",
         "dataSource" : "metrics-kafka",
         "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
         "granularity" : "all",
         "searchDimensions": [
             "url",
             "user"
         ],
         "query": {
             "type": "insensitive_contains",
             "value": "bob"
         }
     }
  7. Execute the query on the master node of the Druid cluster.
    curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

    The —negotiate, -u, -b, and -coptions are for high-security mode Druid clusters.

  8. You will see a query result similar to the following:
    [ {
       "timestamp" : "2018-03-06T09:00:00.000Z",
       "result": {
         "dimension" : "user",
         "value" : "bob",
         "count": 2,
       } ]
     } ]