E-MapReduce V3.11.0 以降のバージョンは、E-MapReduce Druid クラスタータイプとしてサポートしています。

E-MapReduce Druid を (Druid サービスを Hadoop クラスターに追加する代わりに) 別のクラスタータイプとして使用する理由は以下のとおりです。
  • E-MapReduce Druid は Hadoop に依存せずに使用できます。
  • データ量が多い場合、E-Mapreduce Druid は多くのメモリを必要とします。特に Broker および Historical ノードでメモリを必要とします。 E-MapReduce Druid は Yarn によって制御されていないため、マルチサービス操作中にリソースの競合が発生します。
  • インフラストラクチャとして、Hadoop クラスターのノード数は比較的大きくなる傾向がありますが、E-MapReduce Druid クラスターではノード数が比較的小さくなります。 双方を利用することで、より柔軟に作業を実行できます。

Druid クラスターを作成する

クラスターの作成時に、クラスタータイプとして Druid を選択します。 E-MapReduce Druid クラスターの作成時に、Superset と Yarn を選択できます。 本ガイドの冒頭で説明したとおり、Druid クラスターの HDFS と Yarn はテスト専用です。 運用環境として専用の Hadoop クラスターを使用することを推奨します。

クラスターを設定する

  • E-MapReduce Druid のディープストレージとして HDFS を使用するようにクラスターを設定する

    スタンドアロンの E-MapReduce Druid クラスターの場合、インデックスデータを別の Hadoop クラスターの HDFS に保存する必要がある場合があります。 したがって、2 つのクラスター間の接続に関連する設定を完了する必要があります (詳細については、Hadoop クラスターとの連携をご参照ください)。 次に、E-MapReduce Druid の設定ページで以下の項目を設定し、サービスを再起動する必要があります。 設定項目は、設定ページの common.runtime にあります。

    • druid.storage.type: hdfs
    • druid.storage.storageDirectory: (hdfs ディレクトリは、hdfs://emr-header-1.cluster-xxxxxxxx:9000/druid/segments などの完全なディレクトリである必要があります。)
    Hadoop クラスターが HA クラスターの場合、emr-header-1.cluster-xxxxx:9000 を emr-cluster に変更するか、ポート 9000 をポート 8020 に変更する必要があります。
  • E-MapReduce Druid のディープストレージとして OSS を使用する

    E-MapReduce Druid は、OSS をディープストレージとして使用できます。 E-MapReduce の AccessKey-free 機能により、E-MapReduce Druid は AccessKey を設定することなく自動的に OSS にアクセスできます。 E-MapReduce Druid は HDFS の OSS 機能によって OSS にアクセスできるため、druid.storage.type は設定プロセス中も HDFS として設定する必要があります。

    • druid.storage.type: hdfs
    • druid.storage.storageDirectory: (たとえば、oss://emr-druid-cn-hangzhou/segments)

    E-MapReduce Druid は HDFS の OSS 機能によって OSS にアクセスできるため、次の 2 つのシナリオのいずれかを選択する必要があります。

    • クラスターの作成時に HDFS のインストールを選択します。 システムが自動的に設定されます。 (HDFS のインストール後、それを使用しないか、無効にするか、テスト目的でのみ使用するかを選択できます。)
    • hdfs-site.xml を、E-MapReduce Druidの設定ディレクトリ /etc/ecm/druid-conf/druid/_common/ に作成します。内容は次のとおりです。次に、ファイルをすべてのノードの同じディレクトリにコピーします。
      <? xml version="1.0" ? >
        <configuration>
          <property> 
            <name>fs.oss.impl</name>
            <value>com.aliyun.fs.oss.nat.NativeOssFileSystem</value> 
          </property>
          <property>
            <name>fs.oss.buffer.dirs</name> 
            <value>file:///mnt/disk1/data,...</value> 
          </property> 
          <property>
            <name>fs.oss.impl.disable.cache</name>
            <value>true</value> 
          </property>
        </configuration>

      fs.oss.buffer.dirs は複数のパスに設定できます。

  • RDS を使用して E-MapReduce Druid メタデータを保存する

    header-1 ノードの MySQL データベースを使用して、E-MapReduce Druid メタデータを保存します。 Alibaba Cloud RDS を使用してメタデータを保存することもできます。

    以下では、RDS MySQL を例として使用して、設定を示します。 始める前に、以下をご確認ください。

    • RDS MySQL インスタンスが作成されていること。
    • E-MapReduce Druid が RDS MySQL にアクセスするための別のアカウントが作成されていること (root は推奨されません)。 この例では、アカウント名 druid とパスワード druidpw を使用しています。
    • Druid メタデータ用に別の MySQL データベースを作成します。 データベースの名前が druiddb であるとします。
    • アカウント druid に druiddb にアクセスする権限があることを確認してください。

    E-MapReduce コンソールで、設定する E-MapReduce Druid クラスターの後ろにある [管理] をクリックします。 Druid サービスをクリックし、[設定]タブをクリックして common.runtime 設定ファイルを見つけます。 [カスタム設定]をクリックして次の 3 つの設定アイテムを追加します。

    • dbc:mysql://rm-xxxxx.mysql.rds.aliyuncs.com:3306/druiddb を値として持つ druid.metadata.storage.connector.connectURI
    • druid を値として持つ druid.metadata.storage.connector.user
    • druidpw を値として持つ druid.metadata.storage.connector.password

    [保存][クライアント設定をデプロイ][すべてのコンポーネントを再起動] をクリックして、設定を有効にします。

    RDS コンソールにログインして、druiddb によって作成されたテーブルを表示します。 druid によって自動的に作成されたテーブルも表示されます。

  • サービスメモリ設定

    Druid サービスのメモリは、ヒープメモリ (jvm.config で設定) とダイレクトメモリ (jvm.config と runtime.properteis で設定) で設定されます。 E-MapReduce は、クラスターを作成するときに一連の設定を自動的に行いします。 ただし、メモリの設定が必要となる場合があります。

    サービスメモリの設定を調整するには、E-MapReduce コンソールからクラスターサービスにアクセスし、ページで関連する操作を実行します。

    ダイレクトメモリの場合、次の点を確認してください。
    -XX:MaxDirectMemorySize が druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1) 以上であること。

E-MapReduce Druid のウェブページにアクセスする

E-MapReduce Druid には 2 つのWeb ページが付属しています。
  • Overlord:http://emr-header-1.cluster-1234:18090 は、タスクの実行ステータスを表示するために使用されます。
  • Coordinator:http://emr-header-1.cluster-1234:18081 は、セグメントのストレージステータスを表示するために使用されます。
EMR では、 つの方法 E-MapReduce Druid の Web ページにアクセスできます。
  • クラスタ管理ページで、[アクセスリンクとポート] をクリックし、Druid overlord または Druid coordinator のリンクを確認し、リンクをクリックして入力します (EMR-3.20.0 以降のバージョンを推奨します)。
  • SSHトンネリング (SSH tunneling)を使用して SSH トンネルを作成し、プロキシブラウザアクセスを有効にします。
  • 図のように、インターネットアドレスおよびポートを使用して Http: // 123.123.123.123: 18090 にアクセスします (非推奨。セキュリティグループ設定を使用して、インターネット経由のクラスターアクセスを適切に管理してください)。

バッチインデックス

  • Hadoop クラスターとの連携

    E-MapReduce Druid クラスターの作成時に HDFS と Yarn (独自の Hadoop クラスターを使用) を選択した場合、システムは HDFS と Yarn 間の連携を自動的に設定します。 次の例は、スタンドアロンの E-MapReduce Druid クラスターとスタンドアロンの Hadoop クラスター間の連携を設定する方法を示しています。 E-MapReduce Druid クラスター ID は 1234、Hadoop クラスター ID は 5678 であると想定されています。 また、指示の内容に従って正しく設定してください。 単純な設定ミスが原因でクラスターが期待どおりに動作しない場合があります。

    標準モードの Hadoop クラスターとの連携については、次の操作を実行します。

    1. 2 つのクラスター間の通信を確認します。 (各クラスターは異なるセキュリティグループに関連付けられ、アクセスルールは 2 つのセキュリティグループに対して設定されています。)
    2. Hadoop クラスターの /etc/ecm/hadoop-conf の core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml を、E-MapReduce Druid クラスターの各ノードの /etc/ecm/duird-conf/druid/_common ディレクトリに配置します。 (クラスターの作成時に組み込みの Hadoop を選択した場合、このディレクトリー内のいくつかのソフトリンクは、E-MapReduce の Hadoop サービスの設定にマップされます。 最初にこれらのソフトリンクを削除してください。)
    3. Hadoop クラスターのホストを E-MapReduce Druid クラスターのホストリストに書き込みます。 Hadoop クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前の形式にする必要があります。 次のように、Hadoop のホストを E-MapReduce Druid クラスターのホストの後に置くことを推奨します。
      ...
      10.157.201.36   emr-as.cn-hangzhou.aliyuncs.com
      10.157.64.5 eas.cn-hangzhou.emr.aliyuncs.com
      192.168.142.255 emr-worker-1.cluster-1234 emr-worker-1 emr-header-2.cluster-1234 emr-header-2 iZbp1h9g7boqo9x23qbifiZ
      192.168.143.0   emr-worker-2.cluster-1234 emr-worker-2 emr-header-3.cluster-1234 emr-header-3 iZbp1eaa5819tkjx55yr9xZ
      192.168.142.254 emr-header-1.cluster-1234 emr-header-1 iZbp1e3zwuvnmakmsjer2uZ
      高セキュリティモードの Hadoop クラスターの場合、次の操作を実行します。
      192.168.143.6   emr-worker-1.cluster-5678 emr-worker-1 emr-header-2.cluster-5678 emr-header-2 iZbp195rj7zvx8qar4f6b0Z
      192.168.143.7 emr-worker-2.cluster-5678 emr-worker-2 emr-header-3.cluster-5678 emr-header-3 iZbp15vy2rsxoegki4qhdpZ
      192.168.143.5 emr-header-1.cluster-5678 emr-header-1 iZbp10tx4egw3wfnh5oii1Z
    高セキュリティモードの Hadoop クラスターの場合、次の操作を実行します。
    1. 2 つのクラスター間の通信を確認します。 (各クラスターは異なるセキュリティグループに関連付けられ、アクセスルールは 2 つのセキュリティグループに対して設定されています。)
    2. Hadoop クラスターの /etc/ecm/hadoop-conf のcore-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml を、E-MapReduce Druid クラスターの各ノードの /etc/ecm/duird-conf/druid/_common ディレクトリに配置します。 (クラスターの作成時に組み込みの Hadoop を選択した場合、このディレクトリー内のいくつかのソフトリンクは、Hadoop 設定にマップされます。 最初にこれらのソフトリンクを削除してください。) core-site.xml の hadoop.security.authentication.use.hasfalse に変更します。 (この設定は、ユーザーの AccessKey 認証を有効にするためにクライアントで実行されます。 Kerberos 認証を使用する場合は、AccessKey 認証を無効にします。)
    3. Hadoop クラスターのホストを E-MapReduce Druid クラスターの各ノードのホストリストに書き込みます。 Hadoop クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前の形式にする必要があります。 Hadoop のホストを E-MapReduce Druid クラスターのホストの後に置くことを推奨します。
    4. 2 つのクラスター間に Kerberos クロスドメイン相互信頼関係を設定します。 詳細については、「クロスリージョンアクセス」をご参照ください。
    5. Hadoop クラスターのすべてのノードでローカル druid アカウント (useradd-mg hadoop) を作成するか、druid.auth.authenticator.kerberos.authtomateを設定して、Kerberos アカウントからローカルアカウントへのマッピングルールを作成します。 (特定のプレリリースルールについては、「こちら」をご参照ください。) エラーなしで簡単に操作できるため、この方法を推奨します。
      高セキュリティモードの Hadoop クラスターでは、すべての Hadoop コマンドをローカルアカウントから実行する必要があります。 デフォルトでは、このローカルアカウントはプリンシパルと同じ名前である必要があります。 Yarn は、プリンシパルのローカルアカウントへのマッピングもサポートしています。
    6. E-MapReduce DruidDruid サービスを再起動します。
  • Hadoop を使用してバッチデータのインデックスを作成する
    Druid には、wikiticker という名前のサンプルが付属しています。パス ${DRUID_HOME}/quickstart/tutorial にあります。 ${DRUID_HOME} は、デフォルトで /usr/lib/druid-current に設定されています。 wikiticke ドキュメント (wikiticker-2015-09-12-sampled.json.gz) の各行はレコードです。 各レコードは json オブジェクトです。 形式は次のとおりです。
    ```json
    {
        "time": "2015-09-12T00:46:58.771Z",
        "channel": "#en.wikipedia",
        "cityName": null,
        "comment": "added project",
        "countryIsoCode": null,
        "countryName": null,
        "isAnonymous": false,
        "isMinor": false,
        "isNew": false,
        "isRobot": false,
        "isUnpatrolled": false,
        "metroCode": null,
        "namespace": "Talk",
        "page": "Talk:Oswald Tilghman",
        "regionIsoCode": null,
        "regionName": null,
        "user": "GELongstreet",
        "delta": 36,
        "added": 36,
        "deleted": 0
    }
    ```

    Hadoop を使用してバッチデータのインデックスを作成するには、次の手順を実行します。

    1. 圧縮ファイルを解凍し、HDFS のディレクトリに配置します (例:hdfs://emr-header-1.cluster-5678:9000/druid)。 Hadoop クラスターで次のコマンドを実行します。
      ### If you are operating on a standalone Hadoop cluster, copy a druid.keytab to Hadoop cluster after the mutual trust is established between the two clusters, and run the kinit command.
       kinit -kt /etc/ecm/druid-conf/druid.keytab druid
       ###
       hdfs dfs -mkdir hdfs://emr-header-1.cluster-5678:9000/druid
       hdfs dfs -put ${DRUID_HOME}/quickstart/wikiticker-2015-09-16-sampled.json hdfs://emr-header-1.cluster-5678:9000/druid
      • 高セキュリティモードのクラスターに対して HDFS コマンドを実行する前に、/etc/ecm/hadoop-conf/core-site.xmlhadoop.security.authentication.use.hasfalse に修正します。
      • Hadoop クラスターの各ノードに druid という名前の Linux アカウントが作成されていることを確認します。
    2. 次の設定を使用して、データのインデックス作成用にファイルを準備します。 ファイルパスは ${DRUID_HOME}/quickstart/tutorial/wikiticker-index.json に設定されています。
      {
           "type" : "index_hadoop",
           "spec" : {
               "ioConfig" : {
                   "type" : "hadoop",
                   "inputSpec" : {
                       "type" : "static",
                       "paths" : "hdfs://emr-header-1.cluster-5678:9000/druid/wikiticker-2015-09-16-sampled.json"
                   }
               },
               "dataSchema" : {
                   "dataSource" : "wikiticker",
                   "granularitySpec" : {
                       "type" : "uniform",
                       "segmentGranularity" : "day",
                       "queryGranularity" : "none",
                       "intervals" : ["2015-09-12/2015-09-13"]
                   },
                   "parser" : {
                       "type" : "hadoopyString",
                       "parseSpec" : {
                           "format" : "json",
                           "dimensionsSpec" : {
                               "dimensions" : [
                                   "channel",
                                   "cityName",
                                   "comment",
                                   "countryIsoCode",
                                   "countryName",
                                   "isAnonymous",
                                   "isMinor",
                                   "isNew",
                                   "isRobot",
                                   "isUnpatrolled",
                                   "metroCode",
                                   "namespace",
                                   "page",
                                   "regionIsoCode",
                                   "regionName",
                                   "user"
                               ]
                           },
                           "timestampSpec" : {
                               "format" : "auto",
                               "column" : "time"
                           }
                       }
                   },
                   "metricsSpec" : [
                       {
                           "name" : "count",
                           "type" : "count"
                       },
                       {
                           "name" : "added",
                           "type" : "longSum",
                           "fieldName" : "added"
                       },
                       {
                           "name" : "deleted",
                           "type" : "longSum",
                           "fieldName" : "deleted"
                       },
                       {
                           "name" : "delta",
                           "type" : "longSum",
                           "fieldName" : "delta"
                       },
                       {
                           "name" : "user_unique",
                           "type" : "hyperUnique",
                           "fieldName" : "user"
                       }
                   ]
               },
               "tuningConfig" : {
                   "type" : "hadoop",
                   "partitionsSpec" : {
                       "type" : "hashed",
                       "targetPartitionSize" : 5000000
                   },
                   "jobProperties" : {
                       "mapreduce.job.classloader": "true"
                   }
               }
           },
           "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.7.2"]
       }
      • spec.ioConfig.typehadoop に設定されています。
      • spec.ioConfig.inputSpec.paths は入力ファイルのパスです。
      • tuningConfig.typeは、hadoop に設定されています。
      • tuningConfig.jobProperties は、mapreduce ジョブのクラスローダーを設定します。
      • hadoopDependencyCoordinates は、Hadoop クライアントのバージョンを開発します。
    3. E-MapReduce Druid クラスターでバッチインデックスコマンドを実行します。
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/task

      - -negotiate-u-b、および -c オプションは、セキュアな E-MapReduce Druid クラスター用です。 Overload のポート番号はデフォルトで 18090 です。

    4. ジョブの実行状態を表示します。

      ブラウザのアドレスバーに http://emr-header-1.cluster-1234:18090/console.html と入力して、ジョブの実行ステータスを表示します。

    5. E-MapReduce Druid 構文に基づいてデータを照会します。
      E-MapReduce Druid には独自のクエリ構文があります。 クエリ方法を記述した json 形式のクエリファイルを準備する必要があります。 Wikiticker データへの topN クエリは次のとおりです。${DRUID_HOME}/quickstart/wikiticker-top-pages.json):
      {
           "queryType" : "topN",
           "dataSource" : "wikiticker",
           "intervals" : ["2015-09-12/2015-09-13"],
           "granularity" : "all",
           "dimension" : "page",
           "metric" : "edits",
           "threshold" : 25,
           "aggregations" : [
               {
                   "type" : "longSum",
                   "name" : "edits",
                   "fieldName" : "count"
               }
           ]
       }
      次のコマンドを実行して、クエリの結果を確認できます。
      cd ${DRUID_HOME}
       curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/wikiticker-top-pages.json 'http://emr-header-1.cluster-1234:18082/druid/v2/?pretty'

      - -negotiate-u-b-c などの項目は、高セキュリティモードの E-MapReduce Druid クラスター用の項目ですので注意してください。 通常の場合、特定のクエリの結果を確認できます。

リアルタイムインデックス

Kafka クラスターから E-MapReduce Druid クラスターにリアルタイムでデータのインデックスを作成する場合は、Kafka インデックスサービス拡張を使用して高い信頼性を確保し、1 回限りのセマンティクスをサポートすることをお勧めします。 詳細については、「Kafka インデックス作成サービス」のリアルタイムで Kafka データを使用するための Druid Kafka インデックスサービスの使用セクションをご参照ください。

Alibaba Cloud Log Service (SLS) がデータにリアルタイムでアクセスし、E-MapReduce Druid を使用してデータにリアルタイムでインデックスを付ける場合は、SLS インデックス作成サービス拡張を提供します。 SLS インデックス作成サービスを使用すると、Kafka クラスターの作成と保守のオーバーヘッドを回避できます。 SLS インデックス作成サービスは、高信頼性とKafka インデックスサービスのような 1 回限りのセマンティクスを提供します。 ここでは、SLS を Kafka として使用できます。

Kafka インデックス作成サービスと SLS インデックス作成サービスは似ています。 データソースからデータをプルモードで E-MapReduce Druid クラスターにプルし、高い信頼性と 1 回限りのセマンティクスを提供します。

インデックス作成の失敗を分析する

インデックス作成が失敗した場合、通常は次のトラブルシューティング手順に従います。
  • バッチデータインデックスの場合
    1. curl がエラーを直接返すか、値が返されない場合は、入力ファイルの形式を確認してください。 または curl に -v パラメーターを追加して、REST API から返された値を監視します。
    2. Overlord ページでジョブの実行を確認します。 失敗した場合は、ページのログを確認してください。
    3. 多くの場合、ログは生成されません。 Hadoop ジョブの場合、Yarn ページを開いて、インデックスジョブが生成されているかどうかを確認し、ジョブ実行ログを表示します。
    4. エラーが見つからない場合は、E-MapReduce Druid クラスターにログインし、Overlord の実行ログ (/mnt/disk1/log/druid/overlord—emr-header-1.cluster-xxxx.log) を表示します。 HA クラスターの場合は、ジョブを送信した Overload を確認します。
    5. ジョブが Middlemanager に送信されたが失敗が返された場合は、ジョブが送信されたワーカーを Overlord で表示し、ワーカーノードにログオンして、/mnt/disk1/log/druid/middleManager-emr-header-1.cluster-xxxx.log にある Middlemanager のログを表示する必要があります。
  • Kafka インデックス作成サービスおよび SLS インデックス作成サービスの場合
    1. まず、Http://emr-header-1:18090 から Overlord ウェブページを表示し、Supervisor の実行ステータスを確認し、ペイロードが有効かどうかを確認します。
    2. 失敗したタスクのログを表示します。
    3. タスクログから失敗の原因を特定できない場合、問題をトラブルシューティングするには、Overlord ログから始める必要があります。