このチュートリアルでは、Object Storage Service (OSS) からテキストファイルを読み取る単語数カウントの例を使用して、E-MapReduce (EMR) ノード用の MapReduce (MR) ジョブを開発および設定する方法について説明します。MR ノードを作成することで、大規模なデータセットを複数の並列 Map タスクに分割し、データ処理効率を大幅に向上させることができます。
前提条件
Alibaba Cloud E-MapReduce (EMR) クラスターを作成し、DataWorks に登録済みであること。詳細については、「Data Studio: EMR コンピューティングリソースの関連付け」をご参照ください。
(オプション。RAM ユーザーの場合は必須) タスク開発を担当する Resource Access Management (RAM) ユーザーをワークスペースに追加し、[開発者] または [ワークスペース管理者] ロールを割り当てます。ワークスペース管理者ロールは広範な権限を持つため、慎重に付与してください。メンバーの追加に関する詳細については、「ワークスペースにメンバーを追加する」をご参照ください。
Alibaba Cloud アカウントを使用している場合は、このステップをスキップできます。
このトピックの例に従うには、まず OSS バケットを作成する必要があります。詳細については、「バケットの作成」をご参照ください。
制限事項
このタイプのタスクは、サーバーレスリソースグループ (推奨) または排他的スケジューリングリソースグループでのみ実行できます。
DataWorks で DataLake またはカスタムクラスターのメタデータを管理するには、まずクラスターに EMR-HOOK を設定する必要があります。詳細については、「Hive EMR-HOOK の設定」をご参照ください。
説明クラスターに EMR-HOOK が設定されていない場合、DataWorks はメタデータのリアルタイム表示、監査ログの生成、データリネージの表示、または EMR 関連のデータガバナンス タスクを実行できません。
ソースデータと JAR パッケージの準備
ソースデータの準備
次の内容で input01.txt という名前のサンプルファイルを作成します。
hadoop emr hadoop dw
hive hadoop
dw emrソースデータのアップロード
OSS コンソールにログインします。左側のナビゲーションウィンドウで、[バケットリスト] をクリックします。
ターゲットバケットの名前をクリックして、[ファイル管理] ページに移動します。
このチュートリアルでは、
onaliyun-bucket-2をサンプルバケットとして使用します。[ディレクトリの作成] をクリックして、ソースデータと JAR リソース用のディレクトリを作成します。
[ディレクトリ名] を
emr/datas/wordcount02/inputsに設定して、ソースデータ用のディレクトリを作成します。[ディレクトリ名] を
emr/jarsに設定して、JAR リソース用のディレクトリを作成します。
ソースデータファイルをディレクトリにアップロードします。
/emr/datas/wordcount02/inputsパスに移動し、[ファイルのアップロード] をクリックします。[アップロードするファイル] セクションで [ファイルのスキャン] をクリックし、
input01.txtファイルをバケットに追加して、[ファイルのアップロード] をクリックします。
OSS 用の MapReduce JAR の生成
IDEA プロジェクトを開き、pom 依存関係を追加します。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!-- EMR MR のバージョンは 2.8.5 です。 --> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>MapReduce で OSS ファイルを読み書きするには、次のパラメーターを設定します。
重要警告:Alibaba Cloud アカウントの AccessKey は、すべての API 操作に対する完全な権限を付与します。API 呼び出しや日常の O&M (運用保守) には RAM ユーザーを使用することを推奨します。AccessKey ID と AccessKey Secret をプロジェクトコードやその他の一般にアクセス可能な場所にハードコーディングしないでください。AccessKey が漏洩すると、アカウント内のすべてのリソースのセキュリティが侵害されます。以下のコード例は参考用です。AccessKey の認証情報を安全に保管し、慎重に取り扱ってください。
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");パラメーターの説明:
${accessKeyId}:ご利用の Alibaba Cloud アカウントの AccessKey ID。${accessKeySecret}:ご利用の Alibaba Cloud アカウントの AccessKey Secret。${endpoint}:OSS にアクセスするためのエンドポイント。エンドポイントは、クラスターが存在するリージョンによって決まります。OSS バケットはクラスターと同じリージョンにある必要があります。詳細については、「リージョンとエンドポイント」をご参照ください。
次の Java コードは、公式の Hadoop WordCount の例を基に、OSS ファイルにアクセスするための AccessKey ID と AccessKey Secret の設定を追加したものです。
Java コードを編集した後、JAR パッケージをビルドします。この例では、JAR パッケージの名前として
onaliyun_mr_wordcount-1.0-SNAPSHOT.jarを使用します。
操作手順
EMR MR ノード設定ページで、ジョブを開発します。
EMR MR タスクの開発
ユースケースに応じて、次のいずれかの方法を選択します。
アップロードと参照
DataWorks では、ローカルリソースを DataStudio にアップロードし、ノードで参照できます。EMR MR ノードで必要なリソースが大きすぎて DataWorks UI からアップロードできない場合は、リソースを Object Storage Service (OSS) に保存し、コード内で参照できます。
EMR JAR リソースの作成
詳細については、「リソース管理」をご参照ください。「ソースデータと JAR パッケージの準備」で生成した JAR パッケージを、JAR リソースディレクトリ
emr/jarsに保存します。[クリックしてアップロード] をクリックして JAR リソースをアップロードします。[ストレージパス]、[データソース]、[リソースグループ] を選択します。
[保存] をクリックします。

EMR JAR リソースの参照
作成した [EMR MR] ノードを開き、コードエディターページに移動します。
左側のリソース管理ペインで、参照したいリソース (例:
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar) を見つけ、右クリックして [リソースの参照] を選択します。リソースを参照すると、コードエディターに確認メッセージが表示されます。次に、次のコマンドを入力します。コマンド内の JAR パッケージ名、バケット名、パスは例です。実際の値に置き換えてください。
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs説明EMR MR ノードエディターではコメント文はサポートされていません。
OSS からの参照
OSS REF を使用して、ノードから OSS リソースを直接参照できます。EMR ノードが実行されると、DataWorks は参照された OSS リソースをローカル実行環境に自動的にダウンロードします。この方法は、EMR ジョブが JAR 依存関係を持つ場合や、他のスクリプトファイルに依存する場合に便利です。
JAR リソースのアップロード
コードの開発が完了したら、OSS コンソールにログインし、ご利用のリージョンの左側のナビゲーションウィンドウで [バケットリスト] をクリックします。
ターゲットバケットの名前をクリックして、[ファイル管理] ページに移動します。
このチュートリアルでは、
onaliyun-bucket-2をサンプルバケットとして使用します。JAR リソースをそのディレクトリにアップロードします。
「
emr/jars」ディレクトリに移動し、[ファイルのアップロード] をクリックします。その後、「[アップロードするファイル]」セクションで、[ファイルをスキャン] をクリックします。onaliyun_mr_wordcount-1.0-SNAPSHOT.jarファイルをバケットに追加し、[ファイルのアップロード] をクリックします。
JAR リソースの参照
作成した EMR MR ノードの設定ページで、コードを編集して JAR リソースを参照します。
作成した EMR MR ノードの設定ページで、コードを編集して JAR リソースを参照します。
hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs説明コマンドのフォーマットは
hadoop jar <実行する JAR へのパス> <完全修飾メインクラス名> <入力ディレクトリ> <出力ディレクトリ>です。次の表に、実行する JAR へのパスを説明します。
パラメーター
説明
実行する JAR へのパス
フォーマット:
ossref://{endpoint}/{Bucket}/{object}endpoint:OSS にアクセスするためのエンドポイント。このパラメーターを空のままにすると、EMR クラスターと同じリージョンでのみ OSS にアクセスできます。OSS バケットは EMR クラスターと同じリージョンにある必要があります。
Bucket:オブジェクトを格納するための OSS コンテナー。各 [Bucket] には一意の名前があります。OSS コンソールにログインして、アカウント配下のすべての [Bucket] を表示できます。
object:[Bucket] に格納されている特定のオブジェクト (ファイル名やパスなど)。
(任意) 詳細パラメーターの設定
特定のパラメーターは、ノード構成ペインの右側にあるの下の、[EMR ノードパラメーター] または [DataWorks パラメーター] セクションで設定できます。
説明使用可能な詳細パラメーターは、次の表に示すように、EMR クラスタータイプによって若干異なります。
追加の オープンソースの Spark プロパティは、ノード構成ペインの右側にある の下の [EMR ノードパラメーター] または [DataWorks パラメーター] セクションで構成できます。
Datalake またはカスタムクラスター
詳細パラメーター
説明
queue
ジョブを送信するためのスケジューリングキューを指定します。デフォルトは `default` キューです。EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。
priority
優先度。デフォルト値は 1 です。
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行メソッドを指定します。有効な値:
true:複数の SQL 文を一度に実行します。false(デフォルト):一度に 1 つの SQL 文を実行します。
説明このパラメーターは、データ開発環境でのテストにのみ使用できます。
その他
詳細設定セクションでカスタム MR ジョブパラメーターを追加することもできます。コードを送信すると、DataWorks は
-D key=value文を使用して新しいパラメーターをコマンドに自動的に追加します。Hadoop クラスター
詳細パラメーター
説明
queue
ジョブを送信するためのスケジューリングキューを指定します。デフォルトは `default` キューです。EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。
priority
優先度。デフォルト値は 1 です。
USE_GATEWAY
ジョブをゲートウェイクラスター経由で送信するかどうかを指定します。有効な値:
true:ジョブをゲートウェイクラスター経由で送信します。false(デフォルト):ジョブをゲートウェイクラスター経由で送信しません。ジョブはデフォルトでヘッダーノードに送信されます。
説明このノードが存在するクラスターがゲートウェイクラスターに関連付けられておらず、このパラメーターを
trueに設定した場合、EMR ジョブの送信は失敗します。タスクの実行
Run Configuration の [コンピューティングリソース] セクションで、[コンピューティングリソース] と [リソースグループ] を設定します。
説明タスクのリソース要件に基づいて [スケジューリング CU] を設定することもできます。デフォルトの CU 値は
0.25です。パブリックネットワークまたは VPC 経由でデータソースにアクセスするには、データソースとの接続テストに合格したスケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
ツールバーのパラメーターダイアログボックスで、作成したデータソースを選択し、[実行] をクリックします。
ノードタスクを定期的に実行するには、スケジューリングプロパティを設定します。詳細については、「ノードのスケジューリング設定」をご参照ください。
ノードタスクを設定した後、ノードをデプロイする必要があります。詳細については、「ノードとワークフローのデプロイ」をご参照ください。
タスクがデプロイされた後、オペレーションセンターで実行ステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。
結果の表示
OSS コンソールにログインします。ターゲットバケットの指定された出力ディレクトリで出力ファイルを表示できます。この例では、パスは emr/datas/wordcount02/outputs です。

DataWorks で統計を読み取ります。
EMR Hive ノードを作成します。詳細については、「スケジュールされたワークフローのノードを作成する」をご参照ください。
EMR Hive ノードで、OSS にマウントされた外部 Hive テーブルを作成し、テーブルデータを読み取ります。サンプルコード:
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT '単語', `cout` STRING COMMENT '数' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;次の図に結果を示します。
