从 EMR-3.11.0 版本开始,E-MapReduce 支持将 E-MapReduce Druid 作为 E-MapReduce 的一个集群类型。

背景信息

将 E-MapReduce Druid 作为一种单独的集群类型,而不再是在 Hadoop 集群中增加 Druid 组件,主要基于以下几方面的考虑:

  • E-MapReduce Druid 可以完全脱离 Hadoop 来使用。
  • 大数据量情况下,E-MapReduce Druid 对内存要求比较高,尤其是 Broker节点和 Historical 节点。E-MapReduce Druid 本身不受 YARN 管控 ,在多服务运行时容易发生资源抢夺。
  • Hadoop 作为基础设施,其规模可以比较大,而 E-MapReduce Druid 集群可以比较小,两者配合起来工作灵活性更高。

创建Druid集群

在创建集群时选择Druid集群类型即可,具体创建集群操作请参见创建集群
说明 您在创建 E-MapReduce Druid 集群时可以勾选 YARN 和 Superset 服务,E-MapReduce Druid 集群自带的 HDFS 和 YARN 仅供测试使用,原因如背景信息所述。对于生产环境,我们强烈建议您采用专门的 Hadoop 集群。

配置集群

  • 配置使用 HDFS 作为 E-MapReduce Druid 的 deep storage。

    对于独立的 E-MapReduce Druid 集群,如果您需要将索引数据存放在另外一个 Hadoop 集群的 HDFS 上,则您首先需要设置两个集群的连通性(请参见下文的与 Hadoop 集群交互),然后在 E-MapReduce Druid 配置页面,配置以下两个选项并重启服务即可(配置项位于配置页面的 common.runtime)。

    • druid.storage.type设置为hdfs
    • druid.storage.storageDirectory:HDFS 目录,强烈建议填写完整目录,例如:hdfs://emr-header-1.cluster-xxxxxxxx:9000/druid/segments
    说明 如果 Hadoop 集群为 HA 集群,emr-header-1.cluster-xxxxx:9000 需要改成 emr-cluster,或者把端口 9000 改成 8020,下同。
  • 配置使用 OSS 作为 E-MapReduce Druid 的 deep storage。

    E-MapReduce Druid 支持以 OSS 作为 deep storage,借助于 E-MapReduce 的免 AccessKey 能力,E-MapReduce Druid 不用做 AccessKey 配置即可访问 OSS。由于 OSS 的访问能力是借助于 HDFS 的 OSS 功能实现的,因此在配置时,druid.storage.type需要仍然配置为 HDFS。

    • druid.storage.type: hdfs
    • druid.storage.storageDirectory: (如 oss://emr-druid-cn-hangzhou/segments

    由于 OSS 访问借助了 HDFS,因此您需要选择以下两种方案之一:

    • 建集群的时候选择安装 HDFS,系统自动配好(安装好 HDFS 您可以不使用它,关闭它,或者仅作为测试用途)。
    • 在 E-MapReduce Druid 的配置目录/etc/ecm/druid-conf/druid/_common/下新建hdfs-site.xml,内容如下,然后将该文件拷贝至所有节点的相同目录下。
      <?xml version="1.0"?>
        <configuration>
          <property>
            <name>fs.oss.impl</name>
            <value>com.aliyun.fs.oss.nat.NativeOssFileSystem</value>
          </property>
          <property>
            <name>fs.oss.buffer.dirs</name>
            <value>file:///mnt/disk1/data,...</value>
          </property>
          <property>
            <name>fs.oss.impl.disable.cache</name>
            <value>true</value>
          </property>
        </configuration>

      其中fs.oss.buffer.dirs可以设置多个路径。

  • 配置使用 RDS 作为 E-MapReduce Druid 的元数据存储。

    默认情况下 E-MapReduce Druid 利用 header-1节点上的本地 MySQL 数据库作为元数据存储。您也可以配置使用阿里云 RDS 作为元数据存储。

    下面以 RDS MySQL 版为例演示配置。在具体配置之前,请先确保:

    • RDS MySQL 实例已经被创建。
    • 为 E-MapReduce Druid 访问 RDS MySQL 创建了单独的账户(不推荐使用 root),假设账户名为 druid,密码为 druidpw。
    • 为 E-MapReduce Druid 元数据创建单独的 MySQL 数据库,假设数据库名为 druiddb。
    • 确保账户 druid 有权限访问 druiddb。

    在 E-MapReduce 管理控制台,进入 E-MapReduce Druid 集群,单击 Druid 组件,选择配置选项卡,找到 common.runtime 配置文件。单击自定义配置,添加如下三个配置项:

    • druid.metadata.storage.connector.connectURI,值为 jdbc:mysql://rm-xxxxx.mysql.rds.aliyuncs.com:3306/druiddb。
    • druid.metadata.storage.connector.user,值为druid。
    • druid.metadata.storage.connector.password,值为druidpw。

    依次单击右上角的保存部署配置文件到主机重启所有组件,配置即可生效。

    登录RDS 管理控制台,查看 druiddb 创建表的情况,如果正常,您将会看到一些 druid 自动创建的表。

  • 配置组件内存。

    E-MapReduce Druid 组件内存设置主要包括两方面:堆内存(通过 jvm.config 配置)和 direct 内存(通过 jvm.config 和 runtime.properteis 配置)。在创建集群时,E-MapReduce 会自动生成一套配置,不过在某些情况下您仍然可能需要自己调整内存配置。

    要调整组件内存配置,您可以通过 E-MapReduce 控制台进入到集群组件,在页面上进行操作。

    说明 对于 direct 内存,调整时请确保:
    -XX:MaxDirectMemorySize >= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)

访问 Druid web 页面

E-MapReduce Druid 自带三个 Web 页面:
  • Overlord:http://emr-header-1.cluster-1234:18090,用于查看 task 运行情况。
  • Coordinator:http://emr-header-1.cluster-1234:18081,用于查看 segments 存储情况,并设置 rule 加载和丢弃 segments。
  • Router(EMR-3.23.0 及以上版本):http://emr-header-1.cluster-1234:18888,也称之为 console,是新版 Druid 的统一入口。
E-MapReduce 提供三种方式访问 E-MapReduce Druid 的 Web 页面:
  • 在集群管理页面,单击访问链接与端口,找到 Druid overlord 或 Druid coordinator 链接,单击链接进入(暂不支持)。
  • 通过 SSH 隧道方式建立 SSH 隧道,开启代理浏览器访问。
  • 通过公网 IP+端口访问,如 http://123.123.123.123:18090(不推荐,请通过安全组设置合理控制公网访问集群权限)。

批量索引

  • 与 Hadoop 集群交互

    您在创建 E-MapReduce Druid 集群时如果勾选了 HDFS 和 YARN(自带 Hadoop 集群),那么系统将会自动为您配置好与 HDFS 和 YARN 的交互,您无需做额外操作。下面的介绍是配置独立 E-MapReduce Druid 集群与独立 Hadoop 集群之间交互,这里假设 E-MapReduce Druid 集群 cluster id 为 1234,Hadoop 集群 cluster id 为 5678。另外请严格按照指导进行操作,如果操作不当,集群可能就不会按照预期工作。

    对于与非安全独立 Hadoop 集群交互,请按照如下操作进行:

    1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
    2. 在 E-MapReduce Druid 集群的每个节点的指定路径下,放置一份 Hadoop 集群中 /etc/ecm/hadoop-conf 路径下的 core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml 文件。 这些文件在 E-MapReduce Druid 集群节点上放置的路径与 E-MapReduce 集群的版本有关,详情说明如下:
      • EMR-3.23.0 及以上版本:/etc/ecm/druid-conf/druid/cluster/_common
      • EMR-3.23.0 以下版本:/etc/ecm/druid-conf/druid/_common
      说明 如果创建集群时选了自带 Hadoop,则在上述目录下会有几个软链接指向自带 Hadoop 的配置,请先移除这些软链接。
    3. 将 Hadoop 集群的 hosts 写入到 E-MapReduce Druid 集群的 hosts 列表中,注意 Hadoop 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx,且最好将 Hadoop 的 hosts 放在本集群 hosts 之后,例如:
      ...
      10.157.*.*    emr-as.cn-hangzhou.aliyuncs.com
      10.157.*.*    eas.cn-hangzhou.emr.aliyuncs.com
      192.168.*.*   emr-worker-1.cluster-1234 emr-worker-1 emr-header-2.cluster-1234 emr-header-2 iZbp1h9g7boqo9x23qb****
      192.168.*.*   emr-worker-2.cluster-1234 emr-worker-2 emr-header-3.cluster-1234 emr-header-3 iZbp1eaa5819tkjx55y****
      192.168.*.*   emr-header-1.cluster-1234 emr-header-1 iZbp1e3zwuvnmakmsje****
      --以下为hadoop集群的hosts信息
      192.168.*.*   emr-worker-1.cluster-5678 emr-header-2.cluster-5678 iZbp195rj7zvx8qar4f****
      192.168.*.*   emr-worker-2.cluster-5678 emr-header-3.cluster-5678 iZbp15vy2rsxoegki4q****
      192.168.*.*   emr-header-1.cluster-5678 iZbp10tx4egw3wfnh5o****
    对于安全 Hadoop 集群,请按如下操作进行:
    1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
    2. 在 E-MapReduce Druid 集群的每个节点的指定路径下,放置一份 Hadoop 集群 /etc/ecm/hadoop-conf 路径下的 core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml 文件,并修改 core-site.xml 中 hadoop.security.authentication.use.hasfalse

      其中,core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml 文件在 E-MapReduce Druid 集群节点上放置的路径与 E-MapReduce 集群的版本有关,详情说明如下:

      • EMR-3.23.0 及以上版本:/etc/ecm/druid-conf/druid/cluster/_common
      • EMR-3.23.0 以下版本:/etc/ecm/druid-conf/druid/_common
      说明 如果创建集群时选了自带 Hadoop,则在上述目录下会有几个软链接指向自带 Hadoop 的配置,请先移除这些软链接。

      其中,hadoop.security.authentication.use.has是一个客户端配置,目的是让用户能够使用 AccessKey 进行认证。如果使用 Kerberos 认证方式,则需要 disable 该配置。

    3. 将 Hadoop 集群的 hosts 写入到 E-MapReduce Druid 集群每个节点的 hosts 列表中,注意 Hadoop 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx,且最好将 Hadoop 的 hosts放在本集群hosts之后。
    4. 设置两个集群间的 Kerberos 跨域互信(详情请参见跨域互信)。
    5. 在 Hadoop 集群的所有节点下都创建一个本地 druid 账户(useradd -m -g hadoop druid),或者设置 druid.auth.authenticator.kerberos.authToLocal(具体预发规则请参见这里)创建 Kerberos 账户到本地账户的映射规则。推荐第一种做法,操作简便不易出错。
      说明 默认在安全 Hadoop 集群中,所有 Hadoop 命令必须运行在一个本地的账户中,该本地账户需要与 principal 的 name 部分同名。YARN 也支持将一个 principal 映射至本地一个账户,即上文第二种做法。
    6. 重启 Druid 服务。
  • 使用 Hadoop 对批量数据创建索引
    E-MapReduce Druid 自带了一个名为 wikiticker 的例子,位于${DRUID_HOME}/quickstart/tutorial下面(${DRUID_HOME}默认为 /usr/lib/druid-current)。wikiticker 文件(wikiticker-2015-09-12-sampled.json.gz)的每一行是一条记录,每条记录是个 json 对象。其格式如下所示:
    ```json
    {
        "time": "2015-09-12T00:46:58.771Z",
        "channel": "#en.wikipedia",
        "cityName": null,
        "comment": "added project",
        "countryIsoCode": null,
        "countryName": null,
        "isAnonymous": false,
        "isMinor": false,
        "isNew": false,
        "isRobot": false,
        "isUnpatrolled": false,
        "metroCode": null,
        "namespace": "Talk",
        "page": "Talk:Oswald Tilghman",
        "regionIsoCode": null,
        "regionName": null,
        "user": "GELongstreet",
        "delta": 36,
        "added": 36,
        "deleted": 0
    }
    ```

    使用 Hadoop 对批量数据创建索引,请按照如下步骤进行操作:

    1. 将该压缩文件解压,并放置于 HDFS 的一个目录下(如 hdfs://emr-header-1.cluster-5678:9000/druid)。 在 Hadoop 集群上执行如下命令。
      ### 如果是在独立Hadoop集群上进行操作,做好两个集群互信之后需要拷贝一个 druid.keytab到Hadoop集群再kinit。
       kinit -kt /etc/ecm/druid-conf/druid.keytab druid
       ###
       hdfs dfs -mkdir hdfs://emr-header-1.cluster-5678:9000/druid
       hdfs dfs -put ${DRUID_HOME}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json hdfs://emr-header-1.cluster-5678:9000/druid
      说明
      • 对于安全集群执行 HDFS 命令前先修改 /etc/ecm/hadoop-conf/core-site.xmlhadoop.security.authentication.use.hasfalse
      • 请确保已经在 Hadoop 集群每个节点上创建名为 druid 的 Linux 账户。
    2. 准备一个数据索引任务文件 ${DRUID_HOME}/quickstart/tutorial/wikiticker-index.json,如下所示:
      {
           "type" : "index_hadoop",
           "spec" : {
               "ioConfig" : {
                   "type" : "hadoop",
                   "inputSpec" : {
                       "type" : "static",
                       "paths" : "hdfs://emr-header-1.cluster-5678:9000/druid/wikiticker-2015-09-12-sampled.json"
                   }
               },
               "dataSchema" : {
                   "dataSource" : "wikiticker",
                   "granularitySpec" : {
                       "type" : "uniform",
                       "segmentGranularity" : "day",
                       "queryGranularity" : "none",
                       "intervals" : ["2015-09-12/2015-09-13"]
                   },
                   "parser" : {
                       "type" : "hadoopyString",
                       "parseSpec" : {
                           "format" : "json",
                           "dimensionsSpec" : {
                               "dimensions" : [
                                   "channel",
                                   "cityName",
                                   "comment",
                                   "countryIsoCode",
                                   "countryName",
                                   "isAnonymous",
                                   "isMinor",
                                   "isNew",
                                   "isRobot",
                                   "isUnpatrolled",
                                   "metroCode",
                                   "namespace",
                                   "page",
                                   "regionIsoCode",
                                   "regionName",
                                   "user"
                               ]
                           },
                           "timestampSpec" : {
                               "format" : "auto",
                               "column" : "time"
                           }
                       }
                   },
                   "metricsSpec" : [
                       {
                           "name" : "count",
                           "type" : "count"
                       },
                       {
                           "name" : "added",
                           "type" : "longSum",
                           "fieldName" : "added"
                       },
                       {
                           "name" : "deleted",
                           "type" : "longSum",
                           "fieldName" : "deleted"
                       },
                       {
                           "name" : "delta",
                           "type" : "longSum",
                           "fieldName" : "delta"
                       },
                       {
                           "name" : "user_unique",
                           "type" : "hyperUnique",
                           "fieldName" : "user"
                       }
                   ]
               },
               "tuningConfig" : {
                   "type" : "hadoop",
                   "partitionsSpec" : {
                       "type" : "hashed",
                       "targetPartitionSize" : 5000000
                   },
                   "jobProperties" : {
                       "mapreduce.job.classloader": "true"
                   }
               }
           },
           "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.5"]
       }
      说明
      • spec.ioConfig.type 设置为 hadoop
      • spec.ioConfig.inputSpec.paths 为输入文件路径。
      • tuningConfig.typehadoop
      • tuningConfig.jobProperties 设置了 mapreduce job 的 classloader。
      • hadoopDependencyCoordinates 制定了 hadoop client 的版本。
    3. 在 E-MapReduce Druid 集群上运行批量索引命令。
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-index.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/task

      其中 - -negotiate-u-b-c 等选项是针对安全E-MapReduce Druid集群的。Overlord的端口默认为18090。

    4. 查看作业运行情况。

      在浏览器访问http://emr-header-1.cluster-1234:18090/console.html查看作业运行情况。

    5. 根据 Druid 语法查询数据。
      Druid 有自己的查询语法。请准备一个描述您如何查询 json 格式的查询文件,如下所示为对 wikiticker 数据的一个 top N 查询(${DRUID_HOME}/quickstart/tutorial/wikiticker-top-pages.json):
      {
           "queryType" : "topN",
           "dataSource" : "wikiticker",
           "intervals" : ["2015-09-12/2015-09-13"],
           "granularity" : "all",
           "dimension" : "page",
           "metric" : "edits",
           "threshold" : 25,
           "aggregations" : [
               {
                   "type" : "longSum",
                   "name" : "edits",
                   "fieldName" : "count"
               }
           ]
       }
      在命令行界面运行下面的命令即可看到查询结果:
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-top-pages.json 'http://emr-header-1.cluster-1234:18082/druid/v2/?pretty'

      其中 - -negotiate-u-b-c 等选项是针对安全 E-MapReduce Druid 集群的。如果一切正常,您将能看到具体的查询结果。

实时索引

对于数据从 Kafka 集群实时到 E-MapReduce Druid 集群进行索引,我们推荐使用 Kafka Indexing Service 扩展,提供了高可靠保证,支持 exactly-once 语义。详情请参见 Kafka Indexing Service 中“使用 Druid Kafka Indexing Service 实时消费Kafka数据”一节。

如果您的数据实时打到了阿里云日志服务(SLS),并想用 E-MapReduce Druid 实时索引这部分数据,我们提供了 SLS Indexing Service 扩展。使用 SLS Indexing Service 避免了您额外建立并维护 Kafka 集群的开销。SLS Indexing Service 的作用与 Kafka Indexing Service 相同,也提供高可靠保证和 Exactly-Once语义。在这里,您完全可以把 SLS 当成一个 Kafka 来使用。详情请参见 SLS-Indexing-Service

Kafka Indexing Service 和 SLS Indexing Service 是类似的,都使用拉的方式从数据源拉取数据到 E-MapReduce Druid 集群,并提供高可靠保证和 exactly-once 语义。

索引失败问题分析思路

当发现索引失败时,一般遵循如下排错思路:
  • 对于批量索引
    1. 如果 curl 直接返回错误,或者不返回,检查一下输入文件格式。或者 curl 加上 -v 参数,观察 REST API 的返回情况。
    2. 在 Overlord 页面观察作业执行情况,如果失败,查看页面上的 logs。
    3. 在很多情况下并没有生成 logs,如果是 Hadoop 作业,打开 YARN 页面查看是否有索引作业生成,并查看作业执行 log。
    4. 如果上述情况都没有定位到错误,需要登录到 E-MapReduce Druid 集群,查看 Overlord 的执行日志(位于/mnt/disk1/log/druid/overlord—emr-header-1.cluster-xxxx.log),如果是 HA 集群,查看您提交作业的那个Overlord。
    5. 如果作业已经被提交到 Middlemanager,但是从 Middlemanager 返回了失败,则需要从 Overlord 中查看作业提交到了那个worker,并登录到相应的 worker,查看 Middlemanager 的日志(位于/mnt/disk1/log/druid/middleManager-emr-header-1.cluster-xxxx.log)。
  • 对于 Kafka Indexing Service 和 SLS Indexing Service
    1. 首先查看 Overlord 的 Web 页面:http://emr-header-1:18090, 查看 Supervisor 的运行状态,检查 payload 是否合理。
    2. 查看失败 task 的 log。
    3. 如果不能从 task log 定位出失败原因,则需要从 Overlord log 排查问题。