E-MapReduce (EMR) Spark Streaming ノードは、スループットの高いストリーミングデータを処理するために使用できます。このタイプのノードはフォールトトレランスをサポートしており、エラーが発生したデータストリームを復元するのに役立ちます。このトピックでは、EMR Spark Streaming ノードを使用してデータを開発する方法について説明します。
前提条件
EMR クラスタが作成され、クラスタが DataWorks に登録されています。詳細については、「DataWorks に EMR クラスタを登録する」をご参照ください。
(RAM ユーザーを使用してタスクを開発する場合に必要) RAM ユーザーが DataWorks ワークスペースにメンバーとして追加され、[開発] ロールまたは [ワークスペースマネージャー] ロールが割り当てられています。ワークスペースマネージャーロールには、必要以上の権限があります。ワークスペースマネージャーロールを割り当てる場合は注意してください。メンバーを追加する方法の詳細については、「ワークスペースメンバーを追加し、ロールを割り当てる」をご参照ください。
説明Alibaba Cloud アカウントを使用している場合は、この操作をスキップできます。
ワークスペースディレクトリが作成されています。詳細については、「ワークスペースディレクトリ」をご参照ください。
EMR Spark Streaming ノードが作成されています。詳細については、「自動トリガーノードを作成する」をご参照ください。
制限
このタイプのノードは、サーバーレスリソースグループまたは専用スケジューリングリソースグループでのみ実行できます。サーバーレスリソースグループを使用することをお勧めします。
DataWorks で作成した EMR Spark Streaming ノードは、EMR on ACK ページで作成された Spark クラスタでのデータ開発には使用できません。
手順
EMR Spark Streaming ノードの構成タブで、次の操作を実行します。
EMR JAR リソースの作成と参照
EMR DataLake クラスタを使用する場合は、次の手順を実行して EMR JAR リソースを参照できます。
説明EMR Spark Streaming ノードが大量のリソースに依存している場合、DataWorks コンソールを使用してリソースをアップロードすることはできません。この場合、リソースを Hadoop 分散ファイルシステム (HDFS) に保存してから、EMR Spark Streaming ノードのコードでリソースを参照できます。サンプルコード:
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
EMR JAR リソースを作成します。
Data Studio ページの [リソース管理: すべて] ペインで EMR JAR リソースを作成します。詳細については、「リソース管理」をご参照ください。
リソースの構成タブで、[アップロード] をクリックして JAR パッケージをアップロードします。
[ストレージパス]、[データソース]、および [リソースグループ] パラメーターを構成します。
[保存] をクリックします。
EMR JAR リソースを参照します。
[EMR Spark Streaming] ノードを開きます。ノードの構成タブが表示されます。
Data Studio ページの左側のナビゲーションウィンドウの [リソース管理: すべて] ペインで参照するリソースを見つけ、リソース名を右クリックして、[リソースの参照] を選択します。
##@resource_reference{""} 形式の情報が [EMR Spark Streaming] ノードの構成タブに表示されている場合、コードリソースが参照されます。次に、次のコードを実行します。次のコードの情報は、実際の情報に置き換える必要があります。情報には、リソースパッケージ名、バケット名、パスが含まれます。
##@resource_reference{"examples-1.2.0-shaded.jar"} --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
SQL コードの開発
EMR Spark Streaming ノードの構成タブで、ノードのコードを記述します。サンプルコード:
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
説明この例では、
examples-1.2.0-shaded.jar
パッケージは DataWorks コンソールにアップロードされています。access-key-id
とaccess-key-secret
は、Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットに置き換える必要があります。AccessKey ID と AccessKey シークレットを取得するには、DataWorks コンソール にログインし、右上隅のプロフィール画像にポインターを移動して、[AccessKey] を選択します。EMR Spark Streaming ノードのコードを記述するときにコメントを追加することはできません。
(オプション) 詳細パラメーターの構成
[リアルタイム構成] タブの [EMR ノードパラメーター] セクションで特定のパラメーターを構成できます。パラメーターの構成方法の詳細については、「Spark Configuration」を参照してください。次の表に、構成できる詳細パラメーターを示します。
DataLake クラスタ: EMR on ECS ページで作成
詳細パラメーター
説明
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行方法。有効値:
true
: 複数の SQL 文が同時に実行されます。false
(デフォルト): 一度に 1 つの SQL 文のみが実行されます。
説明このパラメーターは、DataWorks ワークスペースの開発環境でのテストにのみ使用できます。
queue
ジョブがコミットされるスケジューリングキュー。デフォルト値: default。EMR YARN については、「YARN スケジューラ」をご参照ください。
priority
優先度。デフォルト値: 1。
その他
EMR Spark Streaming ノードの詳細設定に SparkConf パラメーターを追加することもできます。DataWorks で EMR Spark Streaming ノードのコードをコミットすると、DataWorks はカスタムパラメーターをコマンドに自動的に追加します。例:
"spark.driver.memory" : "2g"
。説明Ranger 権限制御を有効にする場合は、グローバル Spark パラメーターを構成する際に
spark.hadoop.fs.oss.authorization.method=ranger
を追加して、Ranger 権限制御を有効にする必要があります。その他のパラメーターの構成については、「グローバル Spark パラメーターを構成する」をご参照ください。
SQL 文の実行
[デバッグ構成] タブで、[コンピューティングリソース] セクションの [コンピューティングリソース] パラメーターと、[DataWorks 構成] セクションの [リソースグループ] パラメーターを構成します。
説明タスク実行に必要なリソースに基づいて、[コンピューティング用 CU] パラメーターを構成することもできます。このパラメーターのデフォルト値は
0.25
です。インターネットまたは VPC 経由でデータソースにアクセスする場合は、データソースに接続されているスケジューリング用リソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
ノードの構成タブの上部にあるツールバーで、[実行] をクリックして SQL 文を実行します。
ノードのタスクを定期的に実行する場合は、ビジネス要件に基づいてスケジューリング情報を構成します。
ノードを構成した後、ノードをデプロイします。詳細については、「ノードまたはワークフローのデプロイ」をご参照ください。
ノードをデプロイした後、オペレーションセンターでノードのステータスを確認します。詳細については、「オペレーションセンターの概要」をご参照ください。