すべてのプロダクト
Search
ドキュメントセンター

DataWorks:EMR MR ノード

最終更新日:Oct 28, 2025

E-MapReduce (EMR) MR ノードを作成して、複数の並列マップタスクを使用して大規模なデータセットを処理できます。EMR MR ノードは、データ処理効率を大幅に向上させるのに役立ちます。このトピックでは、EMR MR ノードを開発および構成する方法について説明します。この例では、ノードを使用して OSS バケット内の Object Storage Service (OSS) オブジェクトからデータを読み取り、OSS オブジェクト内の単語数をカウントします。

前提条件

  • Alibaba Cloud EMR クラスターを作成し、DataWorks にアタッチしていること。詳細については、「Data Development (新規): EMR 計算リソースのアタッチ」をご参照ください。

  • (オプション) Resource Access Management (RAM) ユーザーの場合は、タスク開発のためにワークスペースに追加され、[開発者] または [ワークスペース管理者] ロールが割り当てられていることを確認してください。ワークスペース管理者ロールには広範な権限があります。このロールは慎重に付与してください。メンバーの追加に関する詳細については、「ワークスペースへのメンバーの追加」をご参照ください。

    Alibaba Cloud アカウントを使用している場合は、このステップをスキップできます。
  • オープンソースコードがリソースとしてアップロードされるか、ユーザー定義関数 (UDF) が [リソース管理: すべて] ペインにリソースとしてアップロードされていること。この前提条件は、EMR MR ノードでオープンソースコードまたは UDF を参照する場合に満たす必要があります。詳細については、「リソース管理」をご参照ください。

  • OSS バケットが作成されていること。このトピックのタスク開発用のサンプルコードを使用するには、OSS バケットを準備する必要があります。OSS バケットの作成方法の詳細については、「バケットの作成」をご参照ください。

  • EMR MR ノードが作成されていること。詳細については、「自動トリガーワークフローのノードを作成する」をご参照ください。

制限事項

  • このタイプのノードは、サーバーレスリソースグループまたは専用スケジューリングリソースグループでのみ実行できます。サーバーレスリソースグループを使用することをお勧めします。

  • DataWorks で DataLake またはカスタムクラスターのメタデータを管理する場合は、最初にクラスターで EMR-HOOK を構成する必要があります。EMR-HOOK の構成方法の詳細については、「Hive 拡張機能を使用してデータリネージと履歴アクセス情報を記録する」をご参照ください。

    説明

    クラスターで EMR-HOOK を構成しない場合、メタデータはリアルタイムで表示できず、監査ログは生成できず、データリネージは DataWorks で表示できません。EMR ガバナンスタスクも実行できません。

初期データと JAR リソースパッケージの準備

初期データの準備

input01.txt という名前のファイルを作成し、次の初期データを含めます。

hadoop emr hadoop dw
hive hadoop
dw emr

初期データを格納するファイルのアップロード

  1. OSS コンソールにログインします。左側のナビゲーションウィンドウで、[バケット] をクリックします。

  2. [バケット] ページで、目的のバケットを見つけてバケット名をクリックし、[オブジェクト] ページに移動します。

    この例では、onaliyun-bucket-2 バケットが使用されます。

  3. [オブジェクト] ページで、[ディレクトリの作成] をクリックして、初期データと JAR リソースを格納するために使用されるディレクトリを作成します。

    • [ディレクトリ名]emr/datas/wordcount02/inputs に設定して、初期データを格納するために使用されるディレクトリを作成します。

    • [ディレクトリ名]emr/jars に設定して、JAR リソースを格納するために使用されるディレクトリを作成します。

  4. 初期データを格納するファイルを emr/datas/wordcount02/inputs ディレクトリにアップロードします。

    • /emr/datas/wordcount02/inputs ディレクトリに移動し、[オブジェクトのアップロード] をクリックします。

    • [アップロードするファイル] セクションで、[ファイルの選択] をクリックし、input01.txt ファイルをバケットにアップロードします。

EMR MR ノードを使用して OSS オブジェクトを読み取り、JAR パッケージを生成する

  1. 既存の IntelliJ IDEA プロジェクトを開き、Project Object Model (POM) 依存関係を追加します。

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--EMR MR で使用されるバージョンは 2.8.5 です。-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. OSS オブジェクトからデータを読み書きするために、次のパラメーターを構成します。

    重要

    Alibaba Cloud アカウントには、すべての API 操作を呼び出す権限があります。Alibaba Cloud アカウントの AccessKey ペアが漏洩すると、Alibaba Cloud アカウント内のすべてのリソースが高いセキュリティリスクにさらされる可能性があります。Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットをプロジェクトコードや簡単に見つけられる場所に保存しないことをお勧めします。API 操作の呼び出しや日常の O&M の実行には、RAM ユーザーを使用することをお勧めします。次のサンプルコードは参照用にのみ提供されています。Alibaba Cloud アカウントの AccessKey ペアは機密情報として扱ってください。

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    パラメーターの説明:

    • ${accessKeyId}: Alibaba Cloud アカウントの AccessKey ID。

    • ${accessKeySecret}: Alibaba Cloud アカウントの AccessKey シークレット。

    • ${endpoint}: OSS のエンドポイント。エンドポイントは、EMR クラスターが存在するリージョンによって決まります。EMR クラスターが存在するリージョンで OSS をアクティブ化する必要があります。詳細については、「リージョンとエンドポイント」をご参照ください。

    このトピックでは、Java コードを使用して Hadoop 公式ウェブサイトの WordCount の例を変更します。AccessKey ID と AccessKey シークレットの構成がコードに追加されます。これにより、ジョブに OSS オブジェクトにアクセスする権限が付与されます。

    サンプルコード

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }
                                    
  3. 上記のコードを記述した後、コードを JAR パッケージに圧縮します。この例では、onaliyun_mr_wordcount-1.0-SNAPSHOT.jar という名前のパッケージが生成されます。

手順

  1. EMR MR ノードの構成タブで、次の操作を実行します。

    EMR MR タスクの開発

    ビジネス要件に基づいて、次のいずれかの方法を使用して EMR MR タスクを開発できます。

    方法 1: EMR JAR リソースのアップロードと参照

    DataWorks では、オンプレミスのマシンから Data Studio にリソースをアップロードしてから、リソースを参照できます。EMR MR ノードが大量のリソースに依存している場合、DataWorks コンソールを使用してリソースをアップロードすることはできません。この場合、リソースを Hadoop 分散ファイルシステム (HDFS) に保存し、EMR MR ノードのコードでリソースを参照できます。

    1. EMR JAR リソースを作成します。

      1. EMR JAR リソースの作成方法の詳細については、「リソース管理」をご参照ください。この例では、このトピックの「初期データと JAR リソースパッケージの準備」セクションで生成された JAR パッケージは emr/jars ディレクトリに保存されます。[アップロード] をクリックして JAR パッケージをアップロードします。

      2. [ストレージパス][データソース]、および [リソースグループ] パラメーターを構成します。

      3. [保存] をクリックします。

      image

    2. EMR JAR リソースを参照します。

      1. [EMR MR] ノードを開きます。ノードの構成タブが表示されます。

      2. Data Studio ページの左側のナビゲーションウィンドウにある [リソース管理: すべて] ペインで参照したいリソースを見つけ、リソース名を右クリックして [リソースの参照] を選択します。この例では、リソースは onaliyun_mr_wordcount-1.0-SNAPSHOT.jar です。

      3. ##@resource_reference{""} 形式の情報が [EMR MR] ノードの構成タブに表示されたら、コードリソースが参照されます。次に、次のコードを実行します。次のコードの情報は、リソースパッケージ名、バケット名、パスなど、実際の情報に置き換える必要があります。

        ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
        onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
        説明

        EMR MR ノードのコードを記述する際にコメントを追加することはできません。

    方法 2: OSS リソースの参照

    現在のノードは、OSS REF メソッドを使用して OSS リソースを参照できます。ノードを実行すると、DataWorks はノードコードで指定された OSS リソースを自動的にロードします。この方法は、EMR タスクで JAR 依存関係が必要な場合や、EMR タスクがスクリプトに依存する必要があるシナリオで一般的に使用されます。

    1. JAR ファイルをアップロードします。

      1. OSS コンソールにログインします。上部のナビゲーションバーで目的のリージョンを選択します。次に、左側のナビゲーションウィンドウで [バケット] をクリックします。

      2. [バケット] ページで、目的のバケットを見つけてバケット名をクリックし、[オブジェクト] ページに移動します。

        この例では、onaliyun-bucket-2 バケットが使用されます。

      3. 作成したディレクトリに JAR ファイルをアップロードします。

        emr/jars ディレクトリに移動します。[オブジェクトのアップロード] をクリックします。[アップロードするファイル] セクションで、[ファイルの選択] をクリックし、onaliyun_mr_wordcount-1.0-SNAPSHOT.jar ファイルを追加します。次に、[オブジェクトのアップロード] をクリックします。

    2. JAR ファイルを参照します。

      JAR ファイルを参照するために使用されるコードを記述します。

      EMR MR ノードの構成タブで、JAR ファイルを参照するために使用されるコードを記述します。

      hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      説明

      上記のコマンドは次の形式です: hadoop jar <参照および実行される JAR ファイルが格納されているパス> <実行されるメインクラスの完全名> <読み取られるファイルが格納されているパス> <結果が格納されているパス>

      パラメーターの説明:

      パラメーター

      説明

      参照および実行される JAR ファイルが格納されているパス

      パスは ossref://{endpoint}/{bucket}/{object} 形式です。

      • endpoint: OSS のエンドポイント。endpoint パラメーターを空のままにすると、現在の EMR クラスターと同じリージョンにある OSS バケット内のリソースのみを参照できます。

      • bucket: OSS でオブジェクトを格納するために使用されるコンテナー。各 バケット には一意の名前があります。OSS コンソールにログインして、現在のログインアカウント内のすべての バケット を表示できます。

      • object: バケット に格納されているファイル名またはパス。

    (オプション) 詳細パラメーターの構成

    ノードの右側にある [プロパティ] タブの [EMR ノードパラメーター] の下にある [DataWorks パラメーター] で、次の表に固有のパラメーターを構成できます。

    説明
    • 次の表に、さまざまなタイプの EMR クラスターで構成できる詳細パラメーターを示します。

    • 追加の オープンソース Spark パラメーターについては、ノードの右側にある [プロパティ] タブの [EMR ノードパラメーター] の下にある [Spark パラメーター] で構成します。

    DataLake クラスターまたはカスタムクラスター: EMR on ECS ページで作成

    詳細パラメーター

    説明

    queue

    ジョブがコミットされるスケジューリングキュー。デフォルト値: default。EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。

    priority

    優先度。デフォルト値: 1。

    FLOW_SKIP_SQL_ANALYZE

    SQL 文の実行方法。有効な値:

    • true: 複数の SQL 文を一度に実行します。

    • false (デフォルト): 一度に 1 つの SQL 文のみを実行します。

    説明

    このパラメーターは、DataWorks ワークスペースの開発環境でのテストにのみ使用できます。

    その他

    DataWorks コンソールの詳細設定で、EMR MR ノードのカスタムパラメーターを詳細パラメーターとして追加できます。DataWorks で EMR MR ノードのコードをコミットすると、DataWorks は自動的にカスタムパラメーターを -D key=value 形式のコマンドに追加します。

    Hadoop クラスター: EMR on ECS ページで作成

    詳細パラメーター

    説明

    queue

    ジョブがコミットされるスケジューリングキュー。デフォルト値: default。EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。

    priority

    優先度。デフォルト値: 1。

    USE_GATEWAY

    現在のノードでジョブをコミットするためにゲートウェイクラスターを使用するかどうかを指定します。有効な値:

    • true: ゲートウェイクラスターを使用してジョブをコミットします。

    • false (デフォルト): ゲートウェイクラスターを使用せずにジョブをコミットします。ジョブは自動的にマスターノードにコミットされます。

    説明

    ノードが属する EMR クラスターがゲートウェイクラスターに関連付けられていないにもかかわらず、USE_GATEWAY パラメーターが true に設定されている場合、ジョブのコミットに失敗する可能性があります。

    SQL 文の実行

    1. ノードの構成タブの右側のナビゲーションウィンドウにある [デバッグ構成] タブで、[計算リソース] セクションの [計算リソース] パラメーターを構成し、DataWorks 構成セクションの [リソースグループ] パラメーターを構成します。

      説明
      • タスクの実行に必要なリソースに基づいて、[計算用 CU] パラメーターを構成することもできます。このパラメーターのデフォルト値は 0.25 です。

      • インターネットまたは VPC (VPC) 経由でデータソースにアクセスする場合は、データソースに接続されているスケジューリング用のリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。

    2. ノードの構成タブの上部ツールバーで、[実行] をクリックして SQL 文を実行します。

  2. ノード上のタスクを定期的に実行する場合は、ビジネス要件に基づいてスケジューリング情報を構成します。

  3. ノードを構成した後、ノードをデプロイします。詳細については、「ノードまたはワークフローのデプロイ」をご参照ください。

  4. ノードをデプロイした後、オペレーションセンターでノードのステータスを表示します。詳細については、「オペレーションセンター入門」をご参照ください。

結果の表示

  • OSS コンソールにログインします。その後、初期データが格納されている emr/datas/wordcount02/outputs ディレクトリで結果を表示できます。目标Bucket

  • DataWorks コンソールで統計結果を表示します。

    1. EMR Hive ノードを作成します。詳細については、「ノード開発」をご参照ください。

    2. EMR Hive ノードで、OSS にマウントされた Hive 外部テーブルを作成します。次に、Hive 外部テーブルを使用して OSS の Hive テーブルからデータを読み取ります。サンプルコード:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT '単語',
          `count` STRING COMMENT 'カウント'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      次の図に結果を示します。运行结果