すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用した Kafka へのアクセス

最終更新日:Apr 01, 2025

このトピックでは、E-MapReduce(EMR)Hadoop クラスターで Spark Streaming ジョブを実行して、Kafka クラスターのデータを処理する方法について説明します。

背景情報

EMR Hadoop クラスターと Kafka クラスターは、オープンソースソフトウェアに基づいて実行されます。そのため、データ開発中に関連する公式ドキュメントを参考にすることができます。

Kerberos 認証が有効になっている Kafka クラスターへのアクセス方法

EMR では、Kerberos 認証が有効になっている Kafka クラスターを作成できます。 Hadoop クラスターでジョブを実行して、Kerberos 認証が有効になっている Kafka クラスターにアクセスできます。アクセス方法は、Hadoop クラスターで Kerberos 認証が有効になっているかどうかによって異なります。

  • Kerberos 認証が無効になっている Hadoop クラスター: Kafka クラスターの Kerberos 認証に使用される kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供します。

  • Kerberos 認証が有効になっている Hadoop クラスター: Hadoop クラスターの Kerberos 認証に使用される kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供します。 Kafka クラスターは、Kerberos 認証のクロスドメイン信頼機能に基づいて認証できます。

    クロスドメイン信頼機能の詳細については、「レルム間の相互信頼」をご参照ください。

どちらの方法でも、ジョブの実行時に Kerberos 認証をサポートするために、kafka_client_jaas.conf ファイルと krb5.conf ファイルを提供する必要があります。

  • kafka_client_jaas.conf ファイルの内容:

    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        serviceName="kafka"
        keyTab="/path/to/kafka.keytab"
        principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
    };
    説明

    keytab ファイルの取得方法については、「MIT Kerberos 認証の構成」をご参照ください。

  • krb5.conf ファイルは、Kafka クラスターの /etc/ ディレクトリから取得できます。

Spark Streaming を使用して Kerberos 認証が有効になっている Kafka クラスターにアクセスする

Hadoop クラスターの各ノードの /etc/hosts ファイルに、Kafka クラスターの各ノードの長いドメイン名と IP アドレスを追加します。ノードの長いドメイン名と IP アドレスは、そのノードの /etc/hosts ファイルで取得できます。長いドメイン名は、emr-xxx-x.cluster-xxx の形式です。

Spark Streaming ジョブを実行して Kerberos 認証が有効になっている Kafka クラスターにアクセスする場合、spark-submit コマンドラインで kafka_client_jaas.conf ファイルと kafka.keytab ファイルを指定できます。

spark-submit --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}/kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config={{PWD}}//kafka_client_jaas.conf -Djava.security.krb5.conf={{PWD}}/krb5.conf" --files /local/path/to/kafka_client_jaas.conf,/local/path/to/kafka.keytab,/local/path/to/krb5.conf --class  xx.xx.xx.KafkaSample --num-executors 2 --executor-cores 2 --executor-memory 1g --master yarn-cluster xxx.jar arg1 arg2 arg3

kafka_client_jaas.conf ファイルでは、keytab ファイルのパスは相対パスである必要があります。 keyTab パラメーターが次の形式で構成されていることを確認してください。

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="kafka.keytab"
    principal="kafka/emr-header-1.cluster-12345@EMR.12345.COM";
};

Spark SQL ステートメントを使用して Kafka にアクセスする

SQL ステートメントの例:

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
説明

/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* には、Kafka データソースへのアクセスに使用するデータソースのタイプが含まれています。 EMR クラスターで Spark 2 を使用している場合は、上記のステートメントの spark3spark2 に変更する必要があります。

次の例は、テーブルを作成し、テーブルからデータをクエリする方法を示しています。

create table test_kafka
using loghub
  options(kafka.bootstrap.servers='alikafka-post-cn-7mz2sqqr****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-7mz2sqqr****-3-vpc.alikafka.aliyuncs.com:9092',
          subscribe='test_topic',
          startingoffsets='earliest'
)

select * from test_kafka;

付録