すべてのプロダクト
Search
ドキュメントセンター

DataWorks:EMR MR ノードの作成

最終更新日:Jul 09, 2025

複数の並列マップタスクを使用して大規模なデータセットを処理するために、E-MapReduce(EMR)MR ノードを作成できます。 EMR MR ノードは、大規模なデータセットでの並列計算の高速化に役立ちます。 このトピックでは、EMR MR ノードを作成し、ノード上でタスクを開発する方法について説明します。 この例では、タスクは OSS バケット内の Object Storage Service (OSS) オブジェクトからデータを読み取り、OSS オブジェクト内の単語数をカウントするために使用されます。

前提条件

  • EMR クラスタが DataWorks に登録されていること。 詳細については、「DataStudio (旧バージョン): EMR 計算リソースを関連付ける」をご参照ください。

  • (RAM ユーザーを使用してタスクを開発する場合に必要) RAM ユーザーが DataWorks ワークスペースにメンバーとして追加され、[開発] ロールまたは [ワークスペース管理者] ロールが割り当てられていること。 ワークスペース管理者ロールには、必要以上の権限があります。 ワークスペース管理者ロールを割り当てる場合は注意してください。 メンバーの追加方法の詳細については、「ワークスペースメンバーを追加し、ロールを割り当てる」をご参照ください。

  • サーバーレスリソースグループが購入され、構成されていること。 構成には、ワークスペースとの関連付けとネットワーク構成が含まれます。 詳細については、「サーバーレスリソースグループを作成して使用する」をご参照ください。

  • ワークフローが DataStudio で作成されていること。 詳細については、「ワークフローを作成する」をご参照ください。

  • EMR MR ノードでオープンソースコードを参照する場合、オープンソースコードが EMR JAR リソースとしてアップロードされていることを確認してください。 詳細については、「EMR リソースを作成して使用する」をご参照ください。

  • EMR MR ノードでユーザー定義関数 (UDF) を参照する場合、UDF が EMR JAR リソースとしてアップロードされ、EMR に登録されていることを確認してください。 UDF の登録方法の詳細については、「EMR 関数を作成する」をご参照ください。

  • OSS バケットが作成されていること。 このトピックのタスク開発のサンプルコードを使用するには、OSS バケットを準備する必要があります。 OSS バケットの作成方法の詳細については、「バケットを作成する」をご参照ください。

制限事項

  • このタイプのノードは、サーバーレスリソースグループまたはスケジューリング専用の排他的リソースグループでのみ実行できます。 サーバーレスリソースグループを使用することをお勧めします。

  • DataWorks で DataLake またはカスタムクラスタのメタデータを管理する場合、最初にクラスタで EMR-HOOK を構成する必要があります。 クラスタで EMR-HOOK を構成しないと、メタデータがリアルタイムで表示されず、監査ログが生成されず、データ系列が DataWorks に表示されません。 また、EMR ガバナンスタスクも実行できません。 EMR-HOOK の構成方法については、「Hive 拡張機能を使用してデータ系列と履歴アクセス情報を記録する」をご参照ください。

初期データと JAR リソースパッケージを準備する

初期データを準備する

input01.txt という名前で、次の初期データを含むファイルを作成します。

hadoop emr hadoop dw
hive hadoop
dw emr

初期データを格納するファイルをアップロードする

  1. OSS コンソール にログインします。 左側のナビゲーションウィンドウで、[バケット] をクリックします。

  2. バケットページで、目的のバケットを見つけ、バケット名をクリックして [オブジェクト] ページに移動します。

    この例では、onaliyun-bucket-2 バケットを使用します。

  3. [ディレクトリの作成] をクリックして、初期データと JAR リソースを格納するために使用するディレクトリを作成します。

    • [ディレクトリ名]emr/datas/wordcount02/inputs に設定して、初期データを格納するために使用するディレクトリを作成します。

    • [ディレクトリ名]emr/jars に設定して、JAR リソースを格納するために使用するディレクトリを作成します。

  4. 初期データを格納するファイルを emr/datas/wordcount02/inputs ディレクトリにアップロードします。

    • /emr/datas/wordcount02/inputs ディレクトリに移動し、[オブジェクトのアップロード] をクリックします。

    • [アップロードするファイル] セクションで、[ファイルを選択] をクリックし、input01.txt ファイルをバケットにアップロードします。

EMR MR ノードを使用して OSS オブジェクトを読み取り、JAR パッケージを生成する

  1. 既存の IntelliJ IDEA プロジェクトを開き、Project Object Model (POM) 依存関係を追加します。

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--EMR MR が使用するバージョンは 2.8.5 です。-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. OSS オブジェクトからデータを読み書きするために、次のパラメータを構成します。

    重要

    Alibaba Cloud アカウントには、すべての API オペレーションを呼び出す権限があります。 Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、Alibaba Cloud アカウント内のすべてのリソースが重大なセキュリティリスクにさらされる可能性があります。 Alibaba Cloud アカウントの AccessKey ID と AccessKey シークレットをプロジェクトコードまたは簡単に見つけられる場所に保存しないことをお勧めします。 API オペレーションの呼び出しや日常の O&M の実行には、RAM ユーザーを使用することをお勧めします。 次のサンプルコードは参照用にのみ提供されています。 Alibaba Cloud アカウントの AccessKey ペアは機密にしてください。

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    パラメータの説明:

    • ${accessKeyId}: Alibaba Cloud アカウントの AccessKey ID。

    • ${accessKeySecret}: Alibaba Cloud アカウントの AccessKey シークレット。

    • ${endpoint}: OSS のエンドポイント。 エンドポイントは、EMR クラスタが存在するリージョンによって決まります。 EMR クラスタが存在するリージョンで OSS をアクティブ化する必要があります。 詳細については、「OSO リージョンとエンドポイント」をご参照ください。

    このトピックでは、Java コードを使用して Hadoop 公式ウェブサイトの WordCount の例を変更します。 AccessKey ID と AccessKey シークレットの構成がコードに追加されます。 これにより、ジョブに OSS オブジェクトにアクセスする権限が付与されます。

    サンプルコード

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // AccessKey ID を設定します
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // AccessKey シークレットを設定します
            conf.set("fs.oss.endpoint", "${endpoint}"); // エンドポイントを設定します
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }          
  3. コードを JAR ファイルにパッケージ化します。

    上記のコードを記述して保存した後、コードを JAR パッケージに圧縮します。 この例では、onaliyun_mr_wordcount-1.0-SNAPSHOT.jar という名前のパッケージが生成されます。

ステップ 1: EMR MR ノードを作成する

  1. DataStudio ページに移動します。

    DataWorks コンソール にログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] を選択します。 表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[データ開発に移動] をクリックします。

  2. EMR MR ノードを作成します。

    1. 目的のワークフローを見つけ、ワークフローの名前を右クリックし、[ノードの作成] > [EMR] > [EMR MR] を選択します。

      説明

      または、ポインタを [作成] アイコンの上に移動し、[ノードの作成] > [EMR] > [EMR MR] を選択することもできます。

    2. [ノードの作成] ダイアログボックスで、[名前][エンジンインスタンス][ノードタイプ][パス] パラメータを構成します。 [確認] をクリックします。 EMR MR ノードの構成タブが表示されます。

      説明

      ノード名には、文字、数字、アンダースコア(_) 、ピリオド(.) のみ使用できます。

ステップ 2: EMR MR タスクを開発する

ビジネス要件に基づいて、次のいずれかの方法を使用して、EMR MR ノードの構成タブで MR タスクを開発できます。

  • オンプレミス マシンから DataStudio にリソースをアップロードしてから、リソースを参照します。 詳細については、このトピックの 方法 1: EMR JAR リソースをアップロードして参照する セクションを参照してください。 この方法を使用することをお勧めします。

  • OSS REF メソッドを使用して OSS リソースを参照します。 詳細については、このトピックの 方法 2: OSS リソースを参照する セクションを参照してください。

方法 1: EMR JAR リソースをアップロードして参照する

DataWorks では、リソースを参照する前に、オンプレミス マシンから DataStudio にリソースをアップロードできます。 EMR MR ノードが大量のリソースに依存している場合、DataWorks コンソールを使用してリソースをアップロードすることはできません。 この場合、リソースを Hadoop Distributed File System (HDFS) に格納し、EMR MR ノードのコードでリソースを参照できます。

  1. EMR JAR リソースを作成します。

    EMR JAR リソースの作成方法の詳細については、「EMR リソースを作成して使用する」をご参照ください。 この例では、初期データと JAR リソースパッケージを準備する セクションで生成された JAR パッケージは、emr/jars ディレクトリに格納されます。 このディレクトリは、JAR リソースを格納するために使用されます。 EMR JAR リソースを初めて使用するときは、[承認] をクリックして、DataWorks が EMR JAR リソースにアクセスすることを承認します。 次に、[アップロード] をクリックして JAR リソースをアップロードします。新建JAR资源

  2. JAR パッケージを参照します。

    1. [EMR MR] ノードを開きます。 ノードの構成タブが表示されます。

    2. [リソース] の下の [EMR] フォルダで参照するリソースを見つけ、リソース名を右クリックし、[リソースパスの挿入] を選択します。 この例では、リソースは onaliyun_mr_wordcount-1.0-SNAPSHOT.jar です。引用资源

    3. 次の図に示すメッセージが [EMR MR] ノードの構成タブに表示された場合、コードリソースが参照されます。 次に、次のコードを実行します。 次のコードの情報は、実際の情報に置き換える必要があります。 情報には、リソースパッケージ名、バケット名、ディレクトリが含まれます。

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      説明

      EMR MR ノードのコードを記述するときは、コメントを追加できません。

方法 2: OSS リソースを参照する

現在のノードは、OSS REF メソッドを使用して OSS リソースを参照できます。 ノードでタスクを実行すると、DataWorks はノードコードで指定された OSS リソースを自動的にロードします。 この方法は、EMR タスクで JAR 依存関係が必要な場合、または EMR タスクがスクリプトに依存する必要がある場合によく使用されます。

  1. JAR ファイルをアップロードします。

    1. OSS コンソール にログインします。 上部のナビゲーションバーで、目的のリージョンを選択します。 次に、左側のナビゲーションウィンドウで、[バケット] をクリックします。

    2. バケットページで、目的のバケットを見つけ、バケット名をクリックして [オブジェクト] ページに移動します。

      この例では、onaliyun-bucket-2 バケットを使用します。

    3. JAR ファイルを作成したディレクトリにアップロードします。

      emr/jars ディレクトリに移動します。 [オブジェクトのアップロード] をクリックします。 [アップロードするファイル] セクションで、[ファイルを選択] をクリックし、onaliyun_mr_wordcount-1.0-SNAPSHOT.jar ファイルを追加します。 次に、[オブジェクトのアップロード] をクリックします。

  2. JAR ファイルを参照します。

    JAR ファイルを参照するために使用するコードを記述します。

    EMR MR ノードの構成タブで、JAR ファイルを参照するために使用するコードを記述します。

    hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
    説明

    上記のコマンドは、hadoop jar <参照して実行する JAR ファイルが格納されているパス> <実行するメインクラスのフルネーム> <読み取るファイルが格納されているパス> <結果が格納されているパス> の形式です。

    パラメータの説明:

    パラメータ

    説明

    参照して実行する JAR ファイルが格納されているパス

    パスは ossref://{endpoint}/{bucket}/{object} 形式です。

    • endpoint: OSS のエンドポイント。 endpoint パラメータを空のままにすると、現在の EMR クラスタと同じリージョンにある OSS バケット内のリソースのみを参照できます。

    • bucket: OSS でオブジェクトを格納するために使用されるコンテナ。 各 bucket には一意の名前があります。 OSS コンソール にログインして、現在のログインアカウント内のすべての Bucket を表示できます。

    • object: bucket に格納されているファイル名またはパス。

(オプション) 詳細パラメータを構成する

現在のノードの構成タブの [詳細設定] タブで詳細パラメータを構成できます。 パラメータの構成方法の詳細については、「Spark Configuration」を参照してください。 次の表に、さまざまなタイプの EMR クラスタに対して構成できる詳細パラメータを示します。

DataLake クラスタまたはカスタムクラスタ: EMR on ECS ページで作成

詳細パラメータ

説明

queue

ジョブがコミットされるスケジューリングキュー。 デフォルト値: default。 EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。

priority

優先度。 デフォルト値: 1。

その他

DataWorks コンソールの [詳細設定] タブで、EMR MR ノードのカスタムパラメータを詳細パラメータとして追加できます。 DataWorks で EMR MR ノードのコードをコミットすると、DataWorks は -D key=value 形式のコマンドにカスタムパラメータを追加します。

Hadoop クラスタ: EMR on ECS ページで作成

詳細パラメータ

説明

queue

ジョブがコミットされるスケジューリングキュー。 デフォルト値: default。 EMR YARN の詳細については、「YARN スケジューラ」をご参照ください。

priority

優先度。 デフォルト値: 1。

USE_GATEWAY

現在のノードでゲートウェイクラスタを使用してジョブをコミットするかどうかを指定します。 有効な値:

  • true: ゲートウェイクラスタを使用してジョブをコミットします。

  • false (デフォルト): ゲートウェイクラスタを使用せずにジョブをコミットします。 ジョブはマスターノードに自動的にコミットされます。

説明

ノードが属する EMR クラスタがゲートウェイクラスタに関連付けられていないが、USE_GATEWAY パラメータが true に設定されている場合、ジョブのコミットに失敗する可能性があります。

MR タスクを実行する

  1. ツールバーの 高级运行 アイコンをクリックします。 [パラメータ] ダイアログボックスで、[リソースグループ名] ドロップダウンリストから目的のリソースグループを選択し、[実行] をクリックします。

    説明
    • インターネットまたは Virtual Private Cloud (VPC) 経由で計算リソースにアクセスする場合は、計算リソースに接続されているスケジューリング用リソースグループを使用する必要があります。 詳細については、「ネットワーク接続ソリューション」をご参照ください。

    • 後続の操作でリソースグループを変更する場合は、高级运行 [ (パラメータ付きで実行)] アイコンをクリックして、[パラメータ] ダイアログボックスでリソースグループを変更できます。

  2. 上部ツールバーの 保存 アイコンをクリックして、SQL 文を保存します。

  3. オプション。 スモークテストを実行します。

    ノードをコミットするとき、またはノードをコミットした後に、開発環境でノードのスモークテストを実行できます。 詳細については、「スモークテストを実行する」をご参照ください。

ステップ 3: スケジューリングプロパティを構成する

システムにノードでタスクを定期的に実行させる場合は、ノードの構成タブの右側のナビゲーションウィンドウで [プロパティ] をクリックして、ビジネス要件に基づいてタスクスケジューリングプロパティを構成できます。 詳細については、「概要」をご参照ください。

説明

タスクをコミットする前に、[プロパティ] タブで [再実行] パラメータと [親ノード] パラメータを構成する必要があります。

ステップ 4: タスクをデプロイする

ノードのタスクが構成されたら、タスクをコミットしてデプロイする必要があります。 タスクをコミットしてデプロイすると、システムはスケジューリング構成に基づいてタスクを定期的に実行します。

  1. 上部ツールバーの 保存 アイコンをクリックして、タスクを保存します。

  2. 上部ツールバーの 提交 アイコンをクリックして、タスクをコミットします。

    [送信] ダイアログボックスで、[変更の説明] パラメータを構成します。 次に、ビジネス要件に基づいて、タスクのコミット後にタスクコードを確認するかどうかを決定します。

    説明
    • タスクをコミットする前に、[プロパティ] タブで [再実行] パラメータと [親ノード] パラメータを構成する必要があります。

    • コードレビュー機能を使用して、タスクのコード品質を確保し、無効なタスクコードによって発生するタスク実行エラーを防ぐことができます。 コードレビュー機能を有効にすると、コミットされたタスクコードは、コードレビューに合格した後でのみデプロイできます。 詳細については、「コードレビュー」をご参照ください。

標準モードのワークスペースを使用する場合は、タスクをコミットした後に、本番環境にタスクをデプロイする必要があります。 ノードのタスクをデプロイするには、ノードの構成タブの右上隅にある [デプロイ] をクリックします。 詳細については、「ノードをデプロイする」をご参照ください。

次のステップ

タスクをコミットしてデプロイすると、タスクはスケジューリング構成に基づいて定期的に実行されます。 対応するノードの構成タブの右上隅にある [オペレーションセンター] をクリックしてオペレーションセンターに移動し、タスクのスケジューリングステータスを表示できます。 詳細については、「自動トリガーされたタスクを表示および管理する」をご参照ください。

結果を表示する

  • OSS コンソール にログインします。 次に、初期データが格納されている emr/datas/wordcount02/outputs ディレクトリで結果を表示できます。目标Bucket

  • DataWorks コンソールで統計結果を表示します。

    1. EMR Hive ノードを作成します。 詳細については、「EMR Hive ノードを作成する」をご参照ください。

    2. EMR Hive ノードで、OSS にマウントされた Hive 外部テーブルを作成します。 次に、Hive 外部テーブルを使用して、OSS の Hive テーブルからデータを読み取ります。 サンプルコード:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT '単語',
          `count` STRING COMMENT 'カウント'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      次の図は結果を示しています。运行结果