本ページでは、E-MapReduce で Apache Druid Kafka インデックス作成サービスを使用して、リアルタイムで Kafka データを取り込む方法について説明します。

Kafka インデックス作成サービスは、Apache Druid によって起動され、Apache Druid のインデックス作成サービスを使用してリアルタイムで Kafka データを取り込む拡張機能です。 拡張機能により、ミドルマネージャーでいくつかのインデックス作成タスクを開始する Overload のスーパーバイザーが有効になります。 これらのタスクは Kafka クラスターに接続してトピックデータを取り込み、インデックスの作成を完了します。 データ取り込みフォーマットファイルを準備し、RESTful API を介して手動でスーパーバイザを起動する必要があります。

Kafka クラスターとの相互作用

最初の相互作用は、E-MapReduce Druid クラスターと Kafka クラスターの間です。 2 つのクラスターの相互作用構成は、Hadoop クラスターの相互作用構成に似ています。 接続とホストを設定する必要があります。 標準モードの Kafka クラスターの場合、以下の手順を実行します。
  1. クラスター間の通信を確認します。 (2 つのクラスターが同じセキュリティグループ内にあるか、各クラスターが異なるセキュリティグループに関連付けられており、アクセスルールがこれらのセキュリティグループに対して構成されています。)
  2. Kafka クラスターのホストを E-MapReduce Druid クラスターの各ノードのホストリストに書き込みます。 Kafka クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前にする必要があります。
高セキュリティモードの Kafka クラスターの場合、次の操作を実行します (最初の 2 つの手順は標準モードのクラスターの手順と同じです)。
  1. 2 つのクラスター間の通信を確認します (2 つのクラスターが同じセキュリティグループ内にあるか、各クラスターが異なるセキュリティグループに関連付けられており、これらのセキュリティグループに対してアクセスルールが構成されています)。
  2. Kafka クラスターのホストを E-MapReduce Druid クラスターの各ノードのホストリストに書き込みます。 Kafka クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前にする必要があります。
  3. 2 つのクラスター間に Kerberos クロスドメイン相互信頼を設定します。 詳細については、「クロスリージョンアクセス」をご参照ください。 双方向の相互信頼が推奨されます。
  4. クライアントセキュリティ構成ファイルを準備します。
    KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          storeKey=true
          keyTab="/etc/ecm/druid-conf/druid.keytab"
          principal="druid@EMR. 1234. COM";
      };

    構成ファイルを E-MapReduce Druid クラスター内のすべてのノードと同期し、次のような特定のディレクトリに配置します。/tmp/kafka/kafka_client_jaas.conf

  5. E-MapReduce Druid 構成ページの overlord.jvmで以下を追加します。
    Add Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
  6. E-MapReduce Druid 構成ページのmiddleManager.runtime で以下のオプションを構成します。druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf およびその他の jvm 起動パラメーター。
  7. E-MapReduce DruidDruid サービスを再起動します。

Apache Druid の Kafka インデックスサービスを使用して、リアルタイムで Kafka データを取り込む

  1. Kafka クラスター (またはゲートウェイ) で次のコマンドを実行して、metrics という名前のトピックを作成します。
    --If the Kafka high-security mode is enabled:
     export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
     --
     kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics

    必要に応じてパラメーターを調整できます。 - -zookeeper/kafka-1.0.0 セクションはパスであり、Kafka クラスターの構成ページで Kafka サービスの zookeeper.connect の値を確認できます。 独自の Kafka クラスターを構築する場合、parmname —zookeeper パラメーターは、実際の構成に応じて変更できます。

  2. データソースのデータ形式記述ファイルを定義します。 metrics-kafka.json という名前を付けて、現在のディレクトリ (または指定した別のディレクトリ) に配置します。
    {
         "type": "kafka",
         "dataSchema": {
             "dataSource": "metrics-kafka",
             "parser": {
                 "type": "string",
                 "parseSpec": {
                     "timestampSpec": {
                         "column": "time",
                         "format": "auto"
                     },
                     "dimensionsSpec": {
                         "dimensions": ["url", "user"]
                     },
                     "format": "json"
                 }
             },
             "granularitySpec": {
                 "type": "uniform",
                 "segmentGranularity": "hour",
                 "queryGranularity": "none"
             },
             "metricsSpec": [{
                     "type": "count",
                     "name": "views"
                 },
                 {
                     "name": "latencyMs",
                     "type": "doubleSum",
                     "fieldName": "latencyMs"
                 }
             ]
         },
         "ioConfig": {
             "topic": "metrics",
             "consumerProperties": {
                 "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092 (the bootstrap.servers of your Kafka clusters)",
                 "group.id": "kafka-indexing-service",
                 "security.protocol": "SASL_PLAINTEXT",
                 "sasl.mechanism": "GSSAPI"
             },
             "taskCount": 1,
             replicas: 1
             "taskDuration": "PT1H"
         },
         "tuningConfig": {
             "type": "kafka",
             "maxRowsInMemory": "100000"
         }
     }
    ioConfig.consumerProperties.security.protocol および ioConfig.consumerProperties.sasl.mechanism はセキュリティ関連のオプションであり、標準モードの Kafka クラスターには必要ありません。
  3. 次のコマンドを実行して、Kafka スーパーバイザーを追加します。
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

    —negotiate-u-b、および -c オプションは高セキュリティモードの Druid クラスター用です。

  4. Kafka クラスターでコンソールプロデューサーを有効にします。
    --If the high-security mode of Kafka is enabled:
     export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
     echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf
     --
     Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics
     >

    —producer.config /tmp/Kafka/producer.conf オプションは、高セキュリティモードの Kafka クラスター用です。

  5. kafka_console_producer のコマンドプロンプトでデータを入力します。
    {"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
     {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
     {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
    タイムスタンプは、次の Python コマンドで生成できます。
    python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
  6. metrics-search.json という名前のクエリファイルを準備します。
    {
         "queryType" : "search",
         "dataSource" : "metrics-kafka",
         "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
         "granularity" : "all",
         "searchDimensions": [
             "url",
             "user"
         ],
         "query": {
             "type": "insensitive_contains",
             "value": "bob"
         }
     }
  7. Druid クラスターのマスターノードでクエリを実行します。
    curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

    —negotiate-u-b、および -c オプションは、高セキュリティモードの Druid クラスター用です。

  8. 次のようなクエリ結果が表示されます。
    [ {
       "timestamp" : "2018-03-06T09:00:00.000Z",
       "result": {
         "dimension" : "user",
         "value" : "bob",
         "count": 2,
       } ]
     } ]