すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Serverless Spark を使用して PySpark ストリーミングジョブを送信する

最終更新日:Nov 09, 2025

ビッグデータの時代において、ストリーム処理はリアルタイムデータ分析に不可欠です。EMR Serverless Spark は、サーバーの管理を不要にすることでリアルタイムのデータ処理を簡素化し、効率を向上させる、強力でスケーラブルなプラットフォームです。このトピックでは、EMR Serverless Spark を使用して PySpark ストリーミングジョブを送信する方法を説明し、ストリーム処理におけるプラットフォームの使いやすさとメンテナンス性を示します。

前提条件

ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

手順

ステップ 1: リアルタイム Dataflow クラスターを作成し、メッセージを生成する

  1. EMR on ECS ページで、Kafka サービスを含む Dataflow クラスターを作成します。詳細については、「クラスターの作成」をご参照ください。

  2. EMR クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  3. 次のコマンドを実行してディレクトリを切り替えます。

    cd /var/log/emr/taihao_exporter
  4. 次のコマンドを実行して Topic を作成します。

    # taihaometrics という名前の Topic を 10 個のパーティションと 2 のレプリケーション係数で作成します。
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
  5. 次のコマンドを実行してメッセージを送信します。

    # kafka-console-producer を使用して taihaometrics Topic にメッセージを送信します。
    tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics

ステップ 2: ネットワーク接続を作成する

  1. [ネットワーク接続] ページに移動します。

    1. EMR コンソールの左側のナビゲーションウィンドウで、[EMR Serverless] > [Spark] を選択します。

    2. [Spark] ページで、ターゲットワークスペースの名前をクリックします。

    3. [EMR Serverless Spark] ページで、左側のナビゲーションウィンドウにある [ネットワーク接続] をクリックします。

  2. [ネットワーク接続] ページで、[ネットワーク接続の作成] をクリックします。

  3. [ネットワーク接続の作成] ダイアログボックスで、次のパラメーターを設定し、[OK] をクリックします。

    パラメーター

    説明

    [名前]

    新しい接続の名前を入力します。例: connection_to_emr_kafka。

    [Virtual Private Cloud (VPC)]

    EMR クラスターと同じ VPC を選択します。

    利用可能な VPC がない場合は、[VPC の作成] をクリックして VPC コンソールに移動し、VPC を作成します。詳細については、「VPC の作成と管理」をご参照ください。

    [VSwitch]

    EMR クラスターと同じ VPC にある vSwitch を選択します。

    現在のゾーンで利用可能な vSwitch がない場合は、[VSwitch] をクリックして VPC コンソールに移動し、vSwitch を作成します。詳細については、「vSwitch の作成と管理」をご参照ください。

    [ステータス][成功] に変わると、ネットワーク接続が作成されます。

ステップ 3: EMR クラスターにセキュリティグループルールを追加する

  1. クラスターノードの vSwitch の CIDR ブロックを取得します。

    [ノード管理] ページで、ノードグループ名をクリックして、関連付けられている vSwitch 情報を表示します。次に、VPC コンソールにログインし、[VSwitch] ページで vSwitch の CIDR ブロックを取得します。

    image

  2. セキュリティグループルールを追加します。

    1. [クラスター管理] ページで、ターゲットクラスターの ID をクリックします。

    2. [基本情報] ページで、[クラスターセキュリティグループ] の横にあるリンクをクリックします。

    3. [セキュリティグループの詳細] ページの [アクセスルール] セクションで、[ルールの追加] をクリックします。次のパラメーターを設定し、[OK] をクリックします。

      パラメーター

      説明

      [ソース]

      前のステップで取得した vSwitch の CIDR ブロックを入力します。

      重要

      外部からの攻撃によるセキュリティリスクを防ぐため、権限付与オブジェクトを 0.0.0.0/0 に設定しないでください。

      [宛先 (このインスタンス)]

      ポート 9092 を入力します。

ステップ 4: JAR パッケージを OSS にアップロードする

kafka.zip ファイルを解凍し、すべての JAR パッケージを Object Storage Service (OSS) にアップロードします。詳細については、「簡易アップロード」をご参照ください。

ステップ 5: リソースファイルをアップロードする

  1. EMR Serverless Spark ページで、左側のナビゲーションウィンドウにある [ファイル管理] をクリックします。

  2. [ファイル管理] ページで、[ファイルのアップロード] をクリックします。

  3. [ファイルのアップロード] ダイアログボックスで、アップロードエリアをクリックし、pyspark_ss_demo.py ファイルを選択します。

ステップ 6: ストリーミングジョブを作成して開始する

  1. EMR Serverless Spark ページで、左側のナビゲーションウィンドウにある [データ開発] をクリックします。

  2. [開発] タブで、image アイコンをクリックします。

  3. 名前を入力し、ジョブタイプとして [ストリーミングジョブ] > [PySpark] を選択し、[OK] をクリックします。

  4. 新しい開発タブで、次のパラメーターを設定し、他のパラメーターはデフォルト値のままにして、[保存] をクリックします。

    パラメーター

    説明

    [メイン Python リソース]

    前のステップで [リソースアップロード] ページにアップロードした pyspark_ss_demo.py ファイルを選択します。

    [エンジンバージョン]

    Spark のバージョンです。詳細については、「エンジンバージョン」をご参照ください。

    [ランタイムパラメーター]

    EMR クラスターの core-1-1 ノードの内部 IP アドレスです。アドレスは、EMR クラスターの [ノード管理] ページの Core ノードグループで確認できます。

    [Spark 構成]

    Spark の構成情報です。次のコードに例を示します。

    spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
    spark.emr.serverless.network.service.name connection_to_emr_kafka
    説明
    • spark.jars: ランタイムにロードする外部 JAR パッケージのパスを指定します。パスをステップ 4 でアップロードしたすべての JAR パッケージの実際のパスに置き換えます。

    • spark.emr.serverless.network.service.name: ネットワーク接続の名前を指定します。名前をステップ 2 で作成したネットワーク接続の名前に置き換えます。

  5. [公開] をクリックします。

  6. [ジョブの公開] ダイアログボックスで、[OK] をクリックします。

  7. ストリーミングジョブを開始します。

    1. [O&M に移動] をクリックします。

    2. [開始] をクリックします。

ステップ 7: ログを表示する

  1. [ログ探索] タブをクリックします。

  2. [ログ探索] タブでは、アプリケーションの実行に関する情報と返された結果を表示できます。

    image

参考資料

PySpark 開発プロセスの詳細については、「PySpark 開発クイックスタート」をご参照ください。