このトピックでは、E-MapReduce(EMR)で Apache Druid によって提供される Kafka Indexing Service プラグインを使用して、Kafka データをリアルタイムで消費する方法について説明します。
前提条件
Druid クラスタと Kafka クラスタは、EMR コンソールで作成されます。詳細については、「クラスタの作成」をご参照ください。
背景情報
Kafka Indexing Service は、Apache Druid によって提供されるプラグインです。 Kafka Indexing Service を使用すると、Apache Druid の Indexing Service を使用して Kafka データをリアルタイムで消費できます。 Kafka Indexing Service プラグインは、Overlord でスーパーバイザーを起動します。スーパーバイザーが起動されると、Indexing タスクは MiddleManager で実行されます。これらのタスクは、Kafka クラスタに接続できます。このようにして、Kafka トピックのデータが消費され、インデックスが作成されます。消費されるデータの形式を記述したファイルを用意し、関連する RESTful API を呼び出してスーパーバイザーを起動するだけで済みます。
Druid クラスタと Kafka クラスタ間のインタラクションを構成する
Druid クラスタと Kafka クラスタ間のインタラクションを構成するには、クラスタ間の接続を確立し、Kafka クラスタのホストを Druid クラスタ内の各ノードのホストリストに追加する必要があります。操作は、Druid クラスタと Hadoop クラスタ間のインタラクションを構成するために行われる操作に似ています。
非高セキュリティ Kafka クラスタの場合、次の手順を実行して Druid クラスタとのインタラクションを構成します。
2 つのクラスタが相互に通信できることを確認します。これは、2 つのクラスタが同じセキュリティグループにあるか、2 つのクラスタがアクセスルールが構成されている異なるセキュリティグループにあることを示します。
Kafka クラスタのホストを Druid クラスタ内の各ノードのホストリストに追加します。
重要Kafka クラスタのホスト名は、長いドメイン名の形式です。例:emr-header-1.cluster-xxxxxxxx。
高セキュリティ Kafka クラスタの場合、次の手順を実行して Druid クラスタとのインタラクションを構成します。
2 つのクラスタが相互に通信できることを確認します。これは、2 つのクラスタが同じセキュリティグループにあるか、2 つのクラスタがアクセスルールが構成されている異なるセキュリティグループにあることを示します。
Kafka クラスタのホストを Druid クラスタ内の各ノードのホストリストに追加します。
重要Kafka クラスタのホスト名は、長いドメイン名の形式です。例:emr-header-1.cluster-xxxxxxxx。
クラスタの Kerberos クロスレルム信頼を実装します。詳細については、「レルム間の相互信頼」をご参照ください。クラスタの双方向クロスレルム信頼を実装することをお勧めします。
Kafka クライアントのセキュリティ構成ファイルを準備します。ファイル内容の例:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/druid-conf/druid.keytab" principal="druid@EMR.1234.COM"; };構成ファイルを Druid クラスタのすべてのノードに同期し、特定のディレクトリに配置します。例:/tmp/kafka/kafka_client_jaas.conf。
Druid サービスページの [構成] タブで、[サービス構成] セクションの overlord.jvm をクリックし、次の情報を追加します:
-Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.confDruid サービスページの [構成] タブで、[サービス構成] セクションの middleManager.runtime をクリックし、
druid.indexer.runner.javaOptsパラメータを Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf に設定し、ビジネス要件に基づいて他の Java 仮想マシン(JVM)起動パラメータを構成します。Druid サービスを再起動します。
Kafka Indexing Service を使用して Kafka データをリアルタイムで消費する
Kafka クラスタまたはゲートウェイクラスタで次のコマンドを実行して、metrics という名前の Kafka トピックを作成します。
-- The Kafka cluster is a high-security Kafka cluster. export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181 --partitions 1 --replication-factor 1 --topic metricsKafkaトピックを作成する場合は、環境設定に基づいて、上記のコマンドのパラメーターを設定します。--zookeeper パラメーターに必要なパスを取得するには、次の操作を実行します。EMRコンソール にログオンします。[設定] タブのKafkaサービスページに移動します。[設定] タブで、zookeeper.connect パラメーターの値を確認します。セルフマネージドKafkaクラスターを使用する場合は、クラスターの実際の構成に基づいて --zookeeper パラメーターを設定する必要があります。
Kafka データソースのデータ形式記述ファイルを定義し、特定のディレクトリに配置します。この例では、ファイル名は metrics-kafka.json です。
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": ["url", "user"] }, "format": "json" } }, "granularitySpec": { "type": "uniform", "segmentGranularity": "hour", "queryGranularity": "none" }, "metricsSpec": [{ "type": "count", "name": "views" }, { "name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs" } ] }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092 (bootstrap.servers of your Kafka cluster)", // Kafka クラスタの bootstrap.servers "group.id": "kafka-indexing-service", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "GSSAPI" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": "100000" } }説明ioConfig.consumerProperties.security.protocol パラメータと ioConfig.consumerProperties.sasl.mechanism パラメータは、高セキュリティ Kafka クラスタの場合にのみ必要です。
次のコマンドを実行して、Kafka スーパーバイザーを追加します。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor--negotiate、-u、-b、および-cパラメータは、高セキュリティ Druid クラスタの場合にのみ必要です。Kafka クラスタでコンソールプロデューサークライアントを起動します。
# The Kafka cluster is a high-security Kafka cluster. export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/kafka-producer.conf kafka-console-producer.sh --producer.config /tmp/kafka-producer.conf --broker-list emr-header-1:9092,emr-header-2:9092,emr-header-3:9092 --topic metrics--producer.config /tmp/kafka-producer.conf パラメータは、高セキュリティ Kafka クラスタの場合にのみ必要です。
Kafka-console-producer.sh コマンドのコマンドプロンプトウィンドウに次のデータレコードを入力します。
{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32} {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11} {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}次の Python コマンドを実行して、上記のコードのタイムスタンプを生成できます。
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'クエリファイルを準備します。この例では、ファイル名は metrics-search.json です。
{ "queryType" : "search", "dataSource" : "metrics-kafka", "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"], "granularity" : "all", "searchDimensions": [ "url", "user" ], "query": { "type": "insensitive_contains", "value": "bob" } }Druid クラスタのマスターノードで次のコマンドを実行します。
curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty--negotiate、-u、-b、および-cパラメータは、高セキュリティ Druid クラスタの場合にのみ必要です。次の情報が返されます。
[ { "timestamp" : "2018-03-06T09:00:00.000Z", "result" : [ { "dimension" : "user", "value" : "bob", "count" : 2 } ] } ]