E-MapReduce (EMR) Spark Streamingノードは、高スループットのストリーミングデータを処理するために使用できます。このタイプのノードはフォールトトレランスをサポートしており、エラーが発生したデータストリームを復元するのに役立ちます。このトピックでは、EMR Spark Streamingノードを作成および使用してデータを開発する方法について説明します。
前提条件
Alibaba Cloud EMR クラスタが作成され、DataWorks に登録されています。詳細については、「DataStudio (旧バージョン): EMR 計算リソースの関連付け」をご参照ください。
(RAM ユーザーを使用してタスクを開発する場合に必要) RAM ユーザーがメンバーとして DataWorks ワークスペースに追加され、[開発] ロールまたは [ワークスペース管理者] ロールが割り当てられています。ワークスペース管理者ロールには、必要以上の権限があります。 ワークスペース管理者ロールを割り当てる場合は注意してください。メンバーの追加方法の詳細については、「ワークスペースメンバーを追加し、ロールを割り当てる」をご参照ください。
サーバーレスリソースグループが購入され、構成されています。構成には、ワークスペースとの関連付けとネットワーク構成が含まれます。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。
DataStudio でワークフローが作成されています。
DataStudio では、さまざまなタイプの計算エンジンの開発操作がワークフローに基づいて実行されます。したがって、ノードを作成する前に、ワークフローを作成する必要があります。詳細については、「ワークフローの作成」をご参照ください。
制限事項
このタイプのノードは、スケジューリング用の サーバーレスリソースグループ または専用リソースグループでのみ実行できます。サーバーレスリソースグループを使用することをお勧めします。
DataWorks で作成した EMR Spark Streamingノードは、EMR on ACK ページで作成された Spark クラスタでのデータ開発には使用できません。
ステップ 1: EMR Spark Streamingノードを作成する
DataStudio ページに移動します。
DataWorks コンソール にログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、 を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。
EMR Spark Streamingノードを作成します。
目的のワークフローを見つけ、ワークフローの名前を右クリックし、
を選択します。説明または、[作成] アイコンにポインターを移動し、
を選択することもできます。[ノードの作成] ダイアログボックスで、[名前]、[エンジンインスタンス]、[ノードタイプ]、[パス] パラメーターを構成します。[確認] をクリックします。EMR Spark Streamingノードの構成タブが表示されます。
説明ノード名には、文字、数字、アンダースコア (_)、ピリオド (.) のみを含めることができます。
ステップ 2: EMR Spark Streamingタスクを開発する
EMR Spark Streamingノードの構成タブで Spark Streamingタスクを開発できます。
EMR JAR リソースの作成と参照
EMR DataLake クラスタを使用する場合は、次の手順を実行して EMR JAR リソースを参照できます。
EMR Spark Streamingノードが大量のリソースに依存している場合、DataWorks コンソールを使用してリソースをアップロードすることはできません。この場合、リソースを Hadoop Distributed File System (HDFS) に保存してから、EMR Spark Streamingノードのコードでリソースを参照できます。サンプルコード:
spark-submit --master yarn
--deploy-mode cluster
--name SparkPi
--driver-memory 4G
--driver-cores 1
--num-executors 5
--executor-memory 4G
--executor-cores 1
--class org.apache.spark.examples.JavaSparkPi
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
EMR JAR リソースを作成します。詳細については、「EMR リソースの作成と使用」をご参照ください。EMR JAR リソースを初めて使用する場合は、[承認] をクリックして、DataWorks が EMR JAR リソースにアクセスすることを承認します。
EMR JAR リソースを参照します。
[EMR Spark Streaming] ノードを開きます。ノードの構成タブが表示されます。
[EMR] フォルダーの [リソース] の下にある参照するリソースを見つけ、リソース名を右クリックして、[リソースパスの挿入] を選択します。
##@resource_reference{""}
形式の句が EMR Spark Streamingノードの構成タブに表示された場合、リソースが参照されます。次に、次のコードを実行します。次のコードの情報は、実際の情報に置き換える必要があります。情報には、リソースパッケージ名、バケット名、ディレクトリが含まれます。##@resource_reference{"examples-1.2.0-shaded.jar"} --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
SQL コードの開発
EMR Spark Streamingノードの構成タブで、ノードのコードを記述します。サンプルコード:
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 2g --driver-memory 1g --num-executors 2 --class com.aliyun.emr.example.spark.streaming.JavaLoghubWordCount examples-1.2.0-shaded.jar <logService-project> <logService-store> <group> <endpoint> <access-key-id> <access-key-secret>
この例では、
examples-1.2.0-shaded.jar
JAR パッケージは DataWorks コンソールにアップロードされています。access-key-id
とaccess-key-secret
は、Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットに置き換える必要があります。AccessKey ID と AccessKey シークレットを取得するには、DataWorks コンソール にログインし、右上隅のプロフィール画像にポインターを移動して、[AccessKey 管理] を選択します。EMR Spark Streamingノードのコードを記述するときにコメントを追加することはできません。
ワークスペースの DataStudio に複数の EMR 計算リソースが関連付けられている場合は、計算リソースから 1 つを選択する必要があります。ワークスペースの DataStudio に 1 つだけの EMR 計算リソースが関連付けられている場合は、データソースを選択する必要はありません。
(オプション) 詳細パラメーターを構成する
現在のノードの構成タブの [詳細設定] タブで詳細パラメーターを構成できます。パラメーターの構成方法の詳細については、「Spark Configuration」を参照してください。次の表は、構成できる詳細パラメーターについて説明しています。
DataLake クラスタ: EMR on ECS
詳細パラメーター | 説明 |
queue | ジョブがコミットされるスケジューリングキュー。デフォルト値: default。EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。 |
priority | 優先度。デフォルト値: 1。 |
その他 | EMR Spark Streamingノードの [詳細設定] タブで SparkConf パラメーターを追加できます。DataWorks で EMR Spark Streamingノードのコードをコミットすると、DataWorks はカスタムパラメーターをコマンドに追加します。例: 説明 Ranger 権限制御を有効にする場合は、グローバル Spark パラメーターを構成する 際に その他のパラメーターの構成については、「グローバル Spark パラメーターの構成」をご参照ください。 |
Spark Streamingタスクを実行する
ツールバーの
アイコンをクリックします。[パラメーター] ダイアログボックスで、[リソースグループ名] ドロップダウンリストから目的のリソースグループを選択し、[実行] をクリックします。
説明インターネットまたは VPC (Virtual Private Cloud) 経由で計算リソースにアクセスする場合は、計算リソースに接続されているスケジューリング用のリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
後続の操作でリソースグループを変更する場合は、
([パラメーター付きで実行]) アイコンをクリックして、[パラメーター] ダイアログボックスでリソースグループを変更できます。
上部ツールバーの
アイコンをクリックして、SQL 文を保存します。
オプション。スモークテストを実行します。
ノードをコミットするとき、またはノードをコミットした後に、開発環境でノードのスモークテストを実行できます。詳細については、「スモークテストを実行する」をご参照ください。
ステップ 3: スケジューリングプロパティを構成する
システムにノードのタスクを定期的に実行させる場合は、ノードの構成タブの右側のナビゲーションウィンドウで [プロパティ] をクリックして、ビジネス要件に基づいてタスクスケジューリングプロパティを構成できます。詳細については、「概要」をご参照ください。
タスクをコミットする前に、[プロパティ] タブで [再実行] パラメーターと [親ノード] パラメーターを構成する必要があります。
ステップ 4: タスクをデプロイする
ノードのタスクが構成されたら、タスクをコミットしてデプロイする必要があります。タスクをコミットしてデプロイすると、システムはスケジューリング構成に基づいてタスクを定期的に実行します。
上部ツールバーの
アイコンをクリックして、タスクを保存します。
上部ツールバーの
アイコンをクリックして、タスクをコミットします。
[送信] ダイアログボックスで、[変更の説明] パラメーターを構成します。次に、ビジネス要件に基づいて、タスクのコミット後にタスクコードを確認するかどうかを決定します。
説明タスクをコミットする前に、[プロパティ] タブで [再実行] パラメーターと [親ノード] パラメーターを構成する必要があります。
コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによって発生するタスク実行エラーを防ぐことができます。コードレビュー機能を有効にすると、コミットされたタスクコードは、コードレビューに合格した後でのみデプロイできます。詳細については、「コードレビュー」をご参照ください。
標準モードのワークスペースを使用する場合は、タスクをコミットした後に本番環境にデプロイする必要があります。ノードのタスクをデプロイするには、ノードの構成タブの右上隅にある [デプロイ] をクリックします。詳細については、「ノードのデプロイ」をご参照ください。
次のステップ
タスクをコミットしてデプロイすると、タスクはスケジューリング構成に基づいて定期的に実行されます。対応するノードの構成タブの右上隅にある [オペレーションセンター] をクリックしてオペレーションセンターに移動し、タスクのスケジューリングステータスを表示できます。詳細については、「自動トリガーされたタスクの表示と管理」をご参照ください。