本ページでは、E-MapReduce で Apache Druid Kafka インデックス作成サービスを使用して、リアルタイムで Kafka データを取り込む方法について説明します。
Kafka インデックス作成サービスは、Apache Druid によって起動され、Apache Druid のインデックス作成サービスを使用してリアルタイムで Kafka データを取り込む拡張機能です。 拡張機能により、ミドルマネージャーでいくつかのインデックス作成タスクを開始する Overload のスーパーバイザーが有効になります。 これらのタスクは Kafka クラスターに接続してトピックデータを取り込み、インデックスの作成を完了します。 データ取り込みフォーマットファイルを準備し、RESTful API を介して手動でスーパーバイザを起動する必要があります。
Kafka クラスターとの相互作用
- クラスター間の通信を確認します。 (2 つのクラスターが同じセキュリティグループ内にあるか、各クラスターが異なるセキュリティグループに関連付けられており、アクセスルールがこれらのセキュリティグループに対して構成されています。)
- Kafka クラスターのホストを E-MapReduce Druid クラスターの各ノードのホストリストに書き込みます。 Kafka クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前にする必要があります。
- 2 つのクラスター間の通信を確認します (2 つのクラスターが同じセキュリティグループ内にあるか、各クラスターが異なるセキュリティグループに関連付けられており、これらのセキュリティグループに対してアクセスルールが構成されています)。
- Kafka クラスターのホストを E-MapReduce Druid クラスターの各ノードのホストリストに書き込みます。 Kafka クラスターのホスト名は、emr-header-1.cluster-xxxxxxxx などの長い名前にする必要があります。
- 2 つのクラスター間に Kerberos クロスドメイン相互信頼を設定します。 詳細については、「クロスリージョンアクセス」をご参照ください。 双方向の相互信頼が推奨されます。
- クライアントセキュリティ構成ファイルを準備します。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/druid-conf/druid.keytab" principal="druid@EMR. 1234. COM"; };
構成ファイルを E-MapReduce Druid クラスター内のすべてのノードと同期し、次のような特定のディレクトリに配置します。/tmp/kafka/kafka_client_jaas.conf。
- E-MapReduce Druid 構成ページの overlord.jvmで以下を追加します。
Add Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
- E-MapReduce Druid 構成ページのmiddleManager.runtime で以下のオプションを構成します。
druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf
およびその他の jvm 起動パラメーター。 - E-MapReduce DruidDruid サービスを再起動します。
Apache Druid の Kafka インデックスサービスを使用して、リアルタイムで Kafka データを取り込む
- Kafka クラスター (またはゲートウェイ) で次のコマンドを実行して、metrics という名前のトピックを作成します。
--If the Kafka high-security mode is enabled: export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" -- kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics
必要に応じてパラメーターを調整できます。 - -zookeeper の /kafka-1.0.0 セクションはパスであり、Kafka クラスターの構成ページで Kafka サービスの zookeeper.connect の値を確認できます。 独自の Kafka クラスターを構築する場合、parmname —zookeeper パラメーターは、実際の構成に応じて変更できます。
- データソースのデータ形式記述ファイルを定義します。 metrics-kafka.json という名前を付けて、現在のディレクトリ (または指定した別のディレクトリ)
に配置します。
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": ["url", "user"] }, "format": "json" } }, "granularitySpec": { "type": "uniform", "segmentGranularity": "hour", "queryGranularity": "none" }, "metricsSpec": [{ "type": "count", "name": "views" }, { "name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs" } ] }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092 (the bootstrap.servers of your Kafka clusters)", "group.id": "kafka-indexing-service", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "GSSAPI" }, "taskCount": 1, replicas: 1 "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": "100000" } }
注 ioConfig.consumerProperties.security.protocol および ioConfig.consumerProperties.sasl.mechanism はセキュリティ関連のオプションであり、標準モードの Kafka クラスターには必要ありません。 - 次のコマンドを実行して、Kafka スーパーバイザーを追加します。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor
—negotiate
、-u
、-b
、および-c
オプションは高セキュリティモードの Druid クラスター用です。 - Kafka クラスターでコンソールプロデューサーを有効にします。
--If the high-security mode of Kafka is enabled: export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf -- Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics >
—producer.config /tmp/Kafka/producer.conf オプションは、高セキュリティモードの Kafka クラスター用です。
- kafka_console_producer のコマンドプロンプトでデータを入力します。
{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32} {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11} {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
タイムスタンプは、次の Python コマンドで生成できます。python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
- metrics-search.json という名前のクエリファイルを準備します。
{ "queryType" : "search", "dataSource" : "metrics-kafka", "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"], "granularity" : "all", "searchDimensions": [ "url", "user" ], "query": { "type": "insensitive_contains", "value": "bob" } }
- Druid クラスターのマスターノードでクエリを実行します。
curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty
—negotiate
、-u
、-b
、および-c
オプションは、高セキュリティモードの Druid クラスター用です。 - 次のようなクエリ結果が表示されます。
[ { "timestamp" : "2018-03-06T09:00:00.000Z", "result": { "dimension" : "user", "value" : "bob", "count": 2, } ] } ]