すべてのプロダクト
Search
ドキュメントセンター

DataWorks:EMR Spark Streaming ノードの作成

最終更新日:Apr 21, 2026

EMR Spark Streaming ノードは、高スループットのリアルタイムデータストリームを処理し、フォールトトレランスを提供することで、データストリームのエラーから迅速に回復するのに役立ちます。このトピックでは、EMR Spark Streaming ノードを作成し、データタスクを開発する方法について説明します。

前提条件

  • Alibaba Cloud E-MapReduce (EMR) クラスターが作成され、DataWorks に登録されていること。詳細については、「DataStudio (レガシ):EMR コンピュートリソースの登録」をご参照ください。

  • (RAM ユーザーを使用してタスクを開発する場合に必須) RAM ユーザーがメンバーとして DataWorks ワークスペースに追加され、開発者またはワークスペース管理者ロールが割り当てられていること。ワークスペース管理者ロールには必要以上の権限が含まれています。このロールを割り当てる際には注意が必要です。メンバーの追加方法の詳細については、「ワークスペースへのメンバーの追加」をご参照ください。

  • サーバーレスリソースグループが購入され、設定されていること。設定には、ワークスペースとの関連付けやネットワーク設定が含まれます。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。

  • DataStudio でワークフローが作成されていること。

    異なるタイプのコンピュートエンジンでの開発操作は、DataStudio のワークフローに基づいて実行されます。したがって、ノードを作成する前に、ワークフローを作成する必要があります。詳細については、「ワークフローの作成」をご参照ください。

制限事項

  • このタイプのタスクは、サーバーレスリソースグループ (推奨) または専用スケジューリングリソースグループでのみ実行されます。

  • EMR on ACK Spark クラスターでは、タスク開発用の EMR Spark Streaming ノードを作成することはできません。

ステップ 1:EMR Spark Streaming ノードの作成

  1. DataStudio ページに移動します。

    DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、データ開発・運用保守 > データ開発を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発へ] をクリックします。

  2. EMR Spark Streaming ノードを作成します。

    1. 対象のワークフローを右クリックし、Create Node > EMR > EMR Spark Streaming を選択します。

      説明

      または、[作成] にカーソルを合わせ、Create Node > EMR > EMR Spark Streaming を選択することもできます。

    2. Create Node ダイアログボックスで、Name を入力し、[エンジンインスタンス]Node TypePath を選択します。[OK] をクリックします。これにより、EMR Spark Streaming ノードの設定ページが開きます。

      説明

      ノード名には、大文字、小文字、漢字、数字、アンダースコア (_)、ピリオド (.) を使用できます。

ステップ 2:EMR Spark Streaming タスクの開発

EMR Spark Streaming ノードの設定ページで、作成したノードをダブルクリックしてタスク開発ページを開きます。

EMR JAR リソースの作成と参照

DataLake クラスターを使用している場合は、以下の手順に従って EMR JAR リソースを参照します。

説明

EMR Spark Streaming ノードが DataWorks にアップロードできない大きなリソースに依存している場合、そのリソースを HDFS に保存し、コード内で参照することができます。例:

spark-submit --master yarn
--deploy-mode cluster
--name SparkPi
--driver-memory 4G
--driver-cores 1
--num-executors 5
--executor-memory 4G
--executor-cores 1
--class org.apache.spark.examples.JavaSparkPi
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
  1. EMR JAR リソースを作成します。詳細については、「EMR リソースの作成と使用」をご参照ください。この機能を初めて使用する場合は、Authorize を実行する必要があります。

  2. EMR JAR リソースを参照します。

    1. [EMR Spark Streaming] ノードを開き、コードエディタに移動します。

    2. EMR > リソースノードで目的のリソースを見つけ、右クリックしてInsert Resource Path を選択します。

    3. リソースを選択すると、ノードエディターに ##@resource_reference{""} 形式の文が表示されます。この文がリソースを参照します。次に、spark-submit コマンドを入力します。コマンド内のリソースパッケージ、バケット名、パスはデモ用です。実際の値に置き換えてください。

      ##@resource_reference{"examples-1.2.0-shaded.jar"}
      --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>

spark-submit コードの開発

EMR Spark Streaming ノードのエディターで、ジョブの spark-submit コマンドを入力します。例:

spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
説明
  • この例では、DataWorks にアップロードされたリソースは examples-1.2.0-shaded.jar です。

  • access-key-idaccess-key-secret を、ご利用の Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret に置き換えてください。これらを取得するには、DataWorks コンソールにログインし、右上のプロフィール画像にカーソルを合わせて、AccessKey 管理ページに移動します。

  • EMR Spark Streaming ノードのコード編集中は、コメントはサポートされていません。

  • DataStudio でワークスペースに複数の EMR 計算リソースが関連付けられている場合は、ビジネス要件に合ったものを選択してください。リソースが 1 つしか関連付けられていない場合は、選択は不要です。

(オプション) 詳細設定の構成

ノードの [詳細設定] セクションで特定のプロパティを設定できます。プロパティの詳細については、「Spark Configuration」をご参照ください。次の表に、利用可能な詳細パラメーターを示します。

データレイク: EMR on ECS

パラメーター

説明

queue

ジョブのスケジューリングキュー。デフォルトのキューは default です。EMR YARN の詳細については、「基本的なキュー構成」をご参照ください。

priority

ジョブの優先度。デフォルト値は 1 です。

その他

このセクションでは、カスタムの SparkConf パラメーターを追加できます。コードを送信すると、DataWorks はこれらのパラメーターをコマンドに自動的に追加します。例:"spark.driver.memory" : "2g"

説明

Ranger を有効にしてアクセスを制御するには、グローバル Spark パラメーターの設定spark.hadoop.fs.oss.authorization.method=ranger 構成を追加します。

パラメーターの設定方法の詳細については、「グローバル Spark パラメーターの設定」をご参照ください。

タスクの実行

  1. ツールバーの 高级运行 アイコンをクリックします。Parameter ダイアログボックスで、作成したスケジューリングリソースグループを選択し、Running をクリックします。

    説明
    • パブリックネットワークまたは VPC 経由で計算リソースにアクセスするには、計算リソースに接続できるスケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    • 後続の実行でリソースグループを変更する必要がある場合は、[パラメーターを指定して実行] 高级运行 アイコンをクリックし、使用するリソースグループを選択します。

  2. 保存 アイコンをクリックしてコードを保存します。

  3. (オプション) スモークテストを実行します。

    開発環境でスモークテストを実行する場合は、ノードをコミットする前または後でテストを実行します。詳細については、「スモークテストの実行」をご参照ください。

ステップ 3:スケジューリングプロパティの設定

システムにノード上のタスクを定期的に実行させたい場合は、ノードの設定タブの右側のナビゲーションウィンドウで [プロパティ] をクリックし、ビジネス要件に基づいてタスクのスケジューリングプロパティを設定できます。詳細については、「概要」をご参照ください。

説明

タスクをコミットする前に、[プロパティ] タブで [再実行][親ノード] パラメーターを設定する必要があります。

ステップ 4:タスクのデプロイ

ノード上のタスクが設定された後、タスクをコミットしてデプロイする必要があります。タスクをコミットしてデプロイすると、システムはスケジューリング設定に基づいてタスクを定期的に実行します。

  1. 上部のツールバーにある 保存 アイコンをクリックしてタスクを保存します。

  2. 上部のツールバーにある 提交 アイコンをクリックしてタスクをコミットします。

    [送信] ダイアログボックスで、[変更の説明] パラメーターを設定します。次に、ビジネス要件に基づいて、タスクをコミットした後にタスクコードをレビューするかどうかを決定します。

    説明
    • タスクをコミットする前に、[プロパティ] タブで [再実行][親ノード] パラメーターを設定する必要があります。

    • コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによるタスク実行エラーを防ぐことができます。コードレビュー機能を有効にすると、コミットされたタスクコードは、コードレビューを通過した後にのみデプロイできます。詳細については、「コードレビュー」をご参照ください。

標準モードのワークスペースを使用している場合、タスクをコミットした後に本番環境にタスクをデプロイする必要があります。ノードにタスクをデプロイするには、ノードの設定タブの右上隅にある [デプロイ] をクリックします。詳細については、「ノードのデプロイ」をご参照ください。

その他の操作

タスクをコミットしてデプロイすると、タスクはスケジューリング設定に基づいて定期的に実行されます。対応するノードの設定タブの右上隅にある [オペレーションセンター] をクリックしてオペレーションセンターに移動し、タスクのスケジューリングステータスを表示できます。詳細については、「定期タスクの管理」をご参照ください。