すべてのプロダクト
Search
ドキュメントセンター

DataWorks:EMR MR ノード

最終更新日:Feb 05, 2026

このチュートリアルでは、Object Storage Service (OSS) からテキストファイルを読み取る単語数カウントの例を使用して、E-MapReduce (EMR) ノード用の MapReduce (MR) ジョブを開発および設定する方法について説明します。MR ノードを作成することで、大規模なデータセットを複数の並列 Map タスクに分割し、データ処理効率を大幅に向上させることができます。

前提条件

  • Alibaba Cloud E-MapReduce (EMR) クラスターを作成し、DataWorks に登録済みであること。詳細については、「Data Studio: EMR コンピューティングリソースの関連付け」をご参照ください。

  • (オプション。RAM ユーザーの場合は必須) タスク開発を担当する Resource Access Management (RAM) ユーザーをワークスペースに追加し、[開発者] または [ワークスペース管理者] ロールを割り当てます。ワークスペース管理者ロールは広範な権限を持つため、慎重に付与してください。メンバーの追加に関する詳細については、「ワークスペースにメンバーを追加する」をご参照ください。

    Alibaba Cloud アカウントを使用している場合は、このステップをスキップできます。
  • このトピックの例に従うには、まず OSS バケットを作成する必要があります。詳細については、「バケットの作成」をご参照ください。

制限事項

  • このタイプのタスクは、サーバーレスリソースグループ (推奨) または排他的スケジューリングリソースグループでのみ実行できます。

  • DataWorks で DataLake またはカスタムクラスターのメタデータを管理するには、まずクラスターに EMR-HOOK を設定する必要があります。詳細については、「Hive EMR-HOOK の設定」をご参照ください。

    説明

    クラスターに EMR-HOOK が設定されていない場合、DataWorks はメタデータのリアルタイム表示、監査ログの生成、データリネージの表示、または EMR 関連のデータガバナンス タスクを実行できません。

ソースデータと JAR パッケージの準備

ソースデータの準備

次の内容で input01.txt という名前のサンプルファイルを作成します。

hadoop emr hadoop dw
hive hadoop
dw emr

ソースデータのアップロード

  1. OSS コンソールにログインします。左側のナビゲーションウィンドウで、[バケットリスト] をクリックします。

  2. ターゲットバケットの名前をクリックして、[ファイル管理] ページに移動します。

    このチュートリアルでは、onaliyun-bucket-2 をサンプルバケットとして使用します。

  3. [ディレクトリの作成] をクリックして、ソースデータと JAR リソース用のディレクトリを作成します。

    • [ディレクトリ名]emr/datas/wordcount02/inputs に設定して、ソースデータ用のディレクトリを作成します。

    • [ディレクトリ名]emr/jars に設定して、JAR リソース用のディレクトリを作成します。

  4. ソースデータファイルをディレクトリにアップロードします。

    • /emr/datas/wordcount02/inputs パスに移動し、[ファイルのアップロード] をクリックします。

    • [アップロードするファイル] セクションで [ファイルのスキャン] をクリックし、input01.txt ファイルをバケットに追加して、[ファイルのアップロード] をクリックします。

OSS 用の MapReduce JAR の生成

  1. IDEA プロジェクトを開き、pom 依存関係を追加します。

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!-- EMR MR のバージョンは 2.8.5 です。 -->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. MapReduce で OSS ファイルを読み書きするには、次のパラメーターを設定します。

    重要

    警告:Alibaba Cloud アカウントの AccessKey は、すべての API 操作に対する完全な権限を付与します。API 呼び出しや日常の O&M (運用保守) には RAM ユーザーを使用することを推奨します。AccessKey ID と AccessKey Secret をプロジェクトコードやその他の一般にアクセス可能な場所にハードコーディングしないでください。AccessKey が漏洩すると、アカウント内のすべてのリソースのセキュリティが侵害されます。以下のコード例は参考用です。AccessKey の認証情報を安全に保管し、慎重に取り扱ってください。

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    パラメーターの説明:

    • ${accessKeyId}:ご利用の Alibaba Cloud アカウントの AccessKey ID。

    • ${accessKeySecret}:ご利用の Alibaba Cloud アカウントの AccessKey Secret。

    • ${endpoint}:OSS にアクセスするためのエンドポイント。エンドポイントは、クラスターが存在するリージョンによって決まります。OSS バケットはクラスターと同じリージョンにある必要があります。詳細については、「リージョンとエンドポイント」をご参照ください。

    次の Java コードは、公式の Hadoop WordCount の例を基に、OSS ファイルにアクセスするための AccessKey ID と AccessKey Secret の設定を追加したものです。

    サンプルコード

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
                                    
  3. Java コードを編集した後、JAR パッケージをビルドします。この例では、JAR パッケージの名前として onaliyun_mr_wordcount-1.0-SNAPSHOT.jar を使用します。

操作手順

  1. EMR MR ノード設定ページで、ジョブを開発します。

    EMR MR タスクの開発

    ユースケースに応じて、次のいずれかの方法を選択します。

    アップロードと参照

    DataWorks では、ローカルリソースを DataStudio にアップロードし、ノードで参照できます。EMR MR ノードで必要なリソースが大きすぎて DataWorks UI からアップロードできない場合は、リソースを Object Storage Service (OSS) に保存し、コード内で参照できます。

    1. EMR JAR リソースの作成

      1. 詳細については、「リソース管理」をご参照ください。「ソースデータと JAR パッケージの準備」で生成した JAR パッケージを、JAR リソースディレクトリ emr/jars に保存します。[クリックしてアップロード] をクリックして JAR リソースをアップロードします。

      2. [ストレージパス][データソース][リソースグループ] を選択します。

      3. [保存] をクリックします。

      image

    2. EMR JAR リソースの参照

      1. 作成した [EMR MR] ノードを開き、コードエディターページに移動します。

      2. 左側のリソース管理ペインで、参照したいリソース (例: onaliyun_mr_wordcount-1.0-SNAPSHOT.jar) を見つけ、右クリックして [リソースの参照] を選択します。

      3. リソースを参照すると、コードエディターに確認メッセージが表示されます。次に、次のコマンドを入力します。コマンド内の JAR パッケージ名、バケット名、パスは例です。実際の値に置き換えてください。

        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        説明

        EMR MR ノードエディターではコメント文はサポートされていません。

    OSS からの参照

    OSS REF を使用して、ノードから OSS リソースを直接参照できます。EMR ノードが実行されると、DataWorks は参照された OSS リソースをローカル実行環境に自動的にダウンロードします。この方法は、EMR ジョブが JAR 依存関係を持つ場合や、他のスクリプトファイルに依存する場合に便利です。

    1. JAR リソースのアップロード

      1. コードの開発が完了したら、OSS コンソールにログインし、ご利用のリージョンの左側のナビゲーションウィンドウで [バケットリスト] をクリックします。

      2. ターゲットバケットの名前をクリックして、[ファイル管理] ページに移動します。

        このチュートリアルでは、onaliyun-bucket-2 をサンプルバケットとして使用します。

      3. JAR リソースをそのディレクトリにアップロードします。

        emr/jars」ディレクトリに移動し、[ファイルのアップロード] をクリックします。その後、「[アップロードするファイル]」セクションで、[ファイルをスキャン] をクリックします。onaliyun_mr_wordcount-1.0-SNAPSHOT.jar ファイルをバケットに追加し、[ファイルのアップロード] をクリックします。

    2. JAR リソースの参照

      作成した EMR MR ノードの設定ページで、コードを編集して JAR リソースを参照します。

      作成した EMR MR ノードの設定ページで、コードを編集して JAR リソースを参照します。

      hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      説明

      コマンドのフォーマットは hadoop jar <実行する JAR へのパス> <完全修飾メインクラス名> <入力ディレクトリ> <出力ディレクトリ> です。

      次の表に、実行する JAR へのパスを説明します。

      パラメーター

      説明

      実行する JAR へのパス

      フォーマット:ossref://{endpoint}/{Bucket}/{object}

      • endpoint:OSS にアクセスするためのエンドポイント。このパラメーターを空のままにすると、EMR クラスターと同じリージョンでのみ OSS にアクセスできます。OSS バケットは EMR クラスターと同じリージョンにある必要があります。

      • Bucket:オブジェクトを格納するための OSS コンテナー。各 [Bucket] には一意の名前があります。OSS コンソールにログインして、アカウント配下のすべての [Bucket] を表示できます。

      • object[Bucket] に格納されている特定のオブジェクト (ファイル名やパスなど)。

    (任意) 詳細パラメーターの設定

    特定のパラメーターは、ノード構成ペインの右側にあるの下の、[EMR ノードパラメーター] または [DataWorks パラメーター] セクションで設定できます。

    説明
    • 使用可能な詳細パラメーターは、次の表に示すように、EMR クラスタータイプによって若干異なります。

    • 追加の オープンソースの Spark プロパティは、ノード構成ペインの右側にある の下の [EMR ノードパラメーター] または [DataWorks パラメーター] セクションで構成できます。

    Datalake またはカスタムクラスター

    詳細パラメーター

    説明

    queue

    ジョブを送信するためのスケジューリングキューを指定します。デフォルトは `default` キューです。EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。

    priority

    優先度。デフォルト値は 1 です。

    FLOW_SKIP_SQL_ANALYZE

    SQL 文の実行メソッドを指定します。有効な値:

    • true:複数の SQL 文を一度に実行します。

    • false (デフォルト):一度に 1 つの SQL 文を実行します。

    説明

    このパラメーターは、データ開発環境でのテストにのみ使用できます。

    その他

    詳細設定セクションでカスタム MR ジョブパラメーターを追加することもできます。コードを送信すると、DataWorks は -D key=value 文を使用して新しいパラメーターをコマンドに自動的に追加します。

    Hadoop クラスター

    詳細パラメーター

    説明

    queue

    ジョブを送信するためのスケジューリングキューを指定します。デフォルトは `default` キューです。EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。

    priority

    優先度。デフォルト値は 1 です。

    USE_GATEWAY

    ジョブをゲートウェイクラスター経由で送信するかどうかを指定します。有効な値:

    • true:ジョブをゲートウェイクラスター経由で送信します。

    • false (デフォルト):ジョブをゲートウェイクラスター経由で送信しません。ジョブはデフォルトでヘッダーノードに送信されます。

    説明

    このノードが存在するクラスターがゲートウェイクラスターに関連付けられておらず、このパラメーターを true に設定した場合、EMR ジョブの送信は失敗します。

    タスクの実行

    1. Run Configuration[コンピューティングリソース] セクションで、[コンピューティングリソース][リソースグループ] を設定します。

      説明
      • タスクのリソース要件に基づいて [スケジューリング CU] を設定することもできます。デフォルトの CU 値は 0.25 です。

      • パブリックネットワークまたは VPC 経由でデータソースにアクセスするには、データソースとの接続テストに合格したスケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    2. ツールバーのパラメーターダイアログボックスで、作成したデータソースを選択し、[実行] をクリックします。

  2. ノードタスクを定期的に実行するには、スケジューリングプロパティを設定します。詳細については、「ノードのスケジューリング設定」をご参照ください。

  3. ノードタスクを設定した後、ノードをデプロイする必要があります。詳細については、「ノードとワークフローのデプロイ」をご参照ください。

  4. タスクがデプロイされた後、オペレーションセンターで実行ステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。

結果の表示

  • OSS コンソールにログインします。ターゲットバケットの指定された出力ディレクトリで出力ファイルを表示できます。この例では、パスは emr/datas/wordcount02/outputs です。目标Bucket

  • DataWorks で統計を読み取ります。

    1. EMR Hive ノードを作成します。詳細については、「スケジュールされたワークフローのノードを作成する」をご参照ください。

    2. EMR Hive ノードで、OSS にマウントされた外部 Hive テーブルを作成し、テーブルデータを読み取ります。サンプルコード:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT '単語',
          `cout` STRING COMMENT '数'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      次の図に結果を示します。运行结果