全部產品
Search
文件中心

E-MapReduce:Kafka Indexing Service

更新時間:Jul 01, 2024

本文介紹如何在E-MapReduce中使用Apache Druid Kafka Indexing Service即時消費Kafka資料。

前提條件

已建立E-MapReduce的Druid叢集和Kafka叢集,詳情請參見建立叢集

背景資訊

Kafka Indexing Service是Apache Druid推出的使用Apache Druid的Indexing Service服務即時消費Kafka資料的外掛程式。該外掛程式會在Overlord中啟動一個Supervisor,Supervisor啟動後會在Middlemanager中啟動indexing task,這些task會串連到Kafka叢集消費topic資料,並完成索引建立。您只需要準備一個資料消費格式檔案,通過REST API手動啟動Supervisor。

配置Druid叢集與Kafka叢集互動

E-MapReduce Druid叢集與Kafka叢集互動的配置方式與Hadoop叢集類似,均需要設定連通性和Hosts。
  • 對於非安全Kafka叢集,請按照以下步驟操作:
    1. 確保叢集間能夠通訊(兩個叢集在一個安全性群組下,或兩個叢集在不同安全性群組,但兩個安全性群組之間配置了訪問規則)。
    2. 將Kafka叢集的Hosts寫入到E-MapReduce Druid叢集每一個節點的Hosts列表中。
      重要 Kafka叢集的hostname應採用長名形式,例如emr-header-1.cluster-xxxxxxxx。
  • 對於安全Kafka叢集,您需要執行下列操作(前兩步與非安全Kafka叢集相同):
    1. 確保叢集間能夠通訊(兩個叢集在一個安全性群組下,或兩個叢集在不同安全性群組,但兩個安全性群組之間配置了訪問規則)。
    2. 將Kafka叢集的hosts寫入到E-MapReduce Druid叢集每一個節點的hosts列表中。
      重要 Kafka叢集的hostname應採用長名形式,例如emr-header-1.cluster-xxxxxxxx。
    3. 設定兩個叢集間的Kerberos跨域互信(詳情請參見跨域互信),推薦做雙向互信。
    4. 準備一個用戶端安全設定檔,檔案內容格式如下。
      KafkaClient {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/ecm/druid-conf/druid.keytab"
            principal="druid@EMR.1234.COM";
        };

      檔案準備好後,將該設定檔同步到E-MapReduce Druid叢集的所有節點上,放置於某一個目錄下面(例如/tmp/kafka/kafka_client_jaas.conf)。

    5. 在E-MapReduce Druid配置頁面的overlord.jvm中新增如下選項。
      -Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
    6. 在E-MapReduce Druid配置頁面的middleManager.runtime中配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他JVM啟動參數。
    7. 重啟Druid服務。

使用Kafka Indexing Service即時消費Kafka資料

  1. 在Kafka叢集(或Gateway)上執行以下命令建立一個名稱為metrics的topic。
    -- 如果開啟了Kafka高安全。
    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
    
    kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181 --partitions 1 --replication-factor 1 --topic metrics

    實際建立topic時,您需要根據您的環境配置來替換上述命令中的各個參數。其中,--zookeeper參數中路徑的擷取方式是:登入阿里雲 E-MapReduce 控制台> 進入Kafka叢集的Kafka服務的配置頁面,查看zookeeper.connect配置項的值。如果您的Kafka叢集是自建叢集,則您需要根據叢集的實際配置來替換--zookeeper參數。

  2. 定義資料來源的資料格式描述檔案(名稱命名為metrics-kafka.json),並放置在目前的目錄下(或放置在其他您指定的目錄上)。
    {
         "type": "kafka",
         "dataSchema": {
             "dataSource": "metrics-kafka",
             "parser": {
                 "type": "string",
                 "parseSpec": {
                     "timestampSpec": {
                         "column": "time",
                         "format": "auto"
                     },
                     "dimensionsSpec": {
                         "dimensions": ["url", "user"]
                     },
                     "format": "json"
                 }
             },
             "granularitySpec": {
                 "type": "uniform",
                 "segmentGranularity": "hour",
                 "queryGranularity": "none"
             },
             "metricsSpec": [{
                     "type": "count",
                     "name": "views"
                 },
                 {
                     "name": "latencyMs",
                     "type": "doubleSum",
                     "fieldName": "latencyMs"
                 }
             ]
         },
         "ioConfig": {
             "topic": "metrics",
             "consumerProperties": {
                 "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092(您 Kafka 叢集的 bootstrap.servers)",
                 "group.id": "kafka-indexing-service",
                 "security.protocol": "SASL_PLAINTEXT",
                 "sasl.mechanism": "GSSAPI"
             },
             "taskCount": 1,
             "replicas": 1,
             "taskDuration": "PT1H"
         },
         "tuningConfig": {
             "type": "kafka",
             "maxRowsInMemory": "100000"
         }
     }
    說明 ioConfig.consumerProperties.security.protocolioConfig.consumerProperties.sasl.mechanism為安全相關選項(非安全Kafka叢集不需要)。
  3. 執行如下命令添加Kafka Supervisor。
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

    其中--negotiate-u-b-c是針對安全E-MapReduce Druid叢集的選項。

  4. 在Kafka叢集上開啟一個Console Producer。
    # 如果開啟了Kafka高安全:
    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
    echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/kafka-producer.conf
    
    kafka-console-producer.sh --producer.config /tmp/kafka-producer.conf --broker-list emr-header-1:9092,emr-header-2:9092,emr-header-3:9092 --topic metrics

    其中,--producer.config /tmp/kafka-producer.conf是針對安全Kafka叢集的選項。

  5. Kafka-console-producer.sh的命令提示字元下輸入資料。
    {"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
    {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
    {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
    時間戳記可用如下Python命令產生。
    python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
  6. 準備名為metrics-search.json的查詢檔案。
    {
         "queryType" : "search",
         "dataSource" : "metrics-kafka",
         "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
         "granularity" : "all",
         "searchDimensions": [
             "url",
             "user"
         ],
         "query": {
             "type": "insensitive_contains",
             "value": "bob"
         }
     }
  7. 在E-MapReduce Druid叢集的Master節點上執行如下命令。
    curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

    其中--negotiate-u-b-c是針對安全 E-MapReduce Druid叢集的選項。

    返回結果樣本如下。
    [ {
       "timestamp" : "2018-03-06T09:00:00.000Z",
       "result" : [ {
         "dimension" : "user",
         "value" : "bob",
         "count" : 2
       } ]
     } ]