このトピックでは、E-MapReduce(EMR)Hadoop クラスターで Spark Streaming ジョブを実行して、Kafka クラスターのデータを処理する方法について説明します。
背景情報
EMR Hadoop クラスターと Kafka クラスターは、オープンソースソフトウェアに基づいて実行されます。そのため、データ開発中に関連する公式ドキュメントを参考にすることができます。
Spark 公式ドキュメント: streaming-kafka-integration および structured-streaming-kafka-integration
EMR デモ: GitHub
Kerberos 認証が有効になっている Kafka クラスターへのアクセス方法
EMR では、Kerberos 認証が有効になっている Kafka クラスターを作成できます。 Hadoop クラスターでジョブを実行して、Kerberos 認証が有効になっている Kafka クラスターにアクセスできます。アクセス方法は、Hadoop クラスターで Kerberos 認証が有効になっているかどうかによって異なります。
Kerberos 認証が無効になっている Hadoop クラスター: Kafka クラスターの Kerberos 認証に使用される kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供します。
Kerberos 認証が有効になっている Hadoop クラスター: Hadoop クラスターの Kerberos 認証に使用される kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供します。 Kafka クラスターは、Kerberos 認証のクロスドメイン信頼機能に基づいて認証できます。
クロスドメイン信頼機能の詳細については、「レルム間の相互信頼」をご参照ください。
どちらの方法でも、ジョブの実行時に Kerberos 認証をサポートするために、kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供する必要があります。
kafka_client_jaas.conf ファイルの内容:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/path/to/kafka.keytab" principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM"; };
説明keytab ファイルの取得方法については、「MIT Kerberos 認証の構成」をご参照ください。
krb5.conf ファイルは、Kafka クラスターの /etc/ ディレクトリから取得できます。
Spark Streaming を使用して Kerberos 認証が有効になっている Kafka クラスターにアクセスする
Hadoop クラスターの各ノードの /etc/hosts ファイルに、Kafka クラスターの各ノードの長いドメイン名と IP アドレスを追加します。ノードの長いドメイン名と IP アドレスは、そのノードの /etc/hosts ファイルで取得できます。長いドメイン名は、emr-xxx-x.cluster-xxx
の形式です。
Spark Streaming ジョブを実行して Kerberos 認証が有効になっている Kafka クラスターにアクセスする場合、spark-submit コマンドラインで kafka_client_jaas.conf ファイルと kafka.keytab ファイルを指定できます。
spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3
kafka_client_jaas.conf ファイルでは、keytab ファイルのパスは相対パスである必要があります。 keyTab パラメーターが次の形式で構成されていることを確認してください。
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="kafka.keytab"
principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};
Spark SQL ステートメントを使用して Kafka にアクセスする
SQL ステートメントの例:
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
には、Kafka データソースへのアクセスに使用するデータソースのタイプが含まれています。 EMR クラスターで Spark 2 を使用している場合は、上記のステートメントの spark3
を spark2
に変更する必要があります。
次の例は、テーブルを作成し、テーブルからデータをクエリする方法を示しています。
create table test_kafka
using loghub
options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
subscribe='test_topic',
startingoffsets='earliest'
)
select * from test_kafka;
付録
サンプルコードについては、GitHub をご覧ください。
パラメーターの詳細については、「Structured Streaming + Kafka Integration Guide」をご参照ください。