このセクションでは、Java SDK を使用してクラスター、ジョブ、および実行プランを迅速に作成する方法について説明します。

SDK サンプルコードの呼び出し、生成には OpenAPI エクスプローラ の使用を推奨します。 OpenAPI エクスプローラにより、クラウドサービスの API を呼び出し、SDK サンプルコードを動的に生成し、すばやくインターフェイスを取得できます。

前提条件

以下の方法で Maven プロジェクトを作成して Maven の依存関係を追加できます。
<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>2.3.9</version>
       </dependency> 
       <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-emr</artifactId>
            <version>2.2.2</version>
        </dependency>

関連する JAR ファイルをローカルディスクにダウンロードすることもできます。 Eclipse の例で考えます。 以下の方法で JAR ファイルをダウンロードできます。

  1. 以下の JAR ファイルをダウンロードします。

    aliyun-java-sdk-core-2.3.9.jar

    aliyun-java-sdk-emr-2.2.2.jar

  2. JAR ファイルをプロジェクトフォルダにコピーします。
  3. Eclipse でプロジェクト名を右クリックして [プロパティ] > [Java ビルドパス] > [JAR を追加] の順に選択します。
  4. ステップ 2 でコピーしたすべての JAR ファイルを選択します。

Eclipse プロジェクトに Alibaba Cloud E-MapReduce OpenAPI Java SDK を使用できるようになります。

クライアントを初期化

IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
    DefaultAcsClient client = new DefaultAcsClient(profile);

SDK における E-MapReduce 上のすべての操作はこのクライアントを使用して実行可能です。

サンプル コード

  • クラスター
    • クラスターの作成
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateClusterRequest request = new CreateClusterRequest();
            request.setName("Your-Cluster-Name");
            // if you did not specify security group id, it will create a new security group with given name
            request.setSecurityGroupId("Your-Security-Group-Id"); // (1)
            request.setAutoRenew(false);
            request.setChargeType("PostPaid"); // PostPaid or PrePaid
            request.setClusterType("HADOOP"); // HADOOP or HBase (2)
            request.setEmrVer("EMR-1.3.0"); // emr image version
            request.setIsOpenPublicIp(true);
            request.setLogEnable(true);
            request.setLogPath("oss://Your-Bucket/Your-Folder");
            request.setMasterPwdEnable(true); // enable master password
            request.setMasterPwd("Aa123456789"); // set master node's password
            request.setZoneId("cn-hangzhou-b"); // set zone
             // I / O 最適化パラメーター ECS インスタンスタイプやクラウドディスクタイプなどの使用可能なハードウェア設定は、指定された ECS インスタンスシリーズによって決まります。
            // For more information about available configurations, see the Buy Now page of ECS.
            // https://ecs.console.aliyun.com/#/create/postpay/
            request.setIoOptimized(true); // You can specify I/O optimization parameters.
            request.setInstanceGeneration("ecs-2"); // You can specify ecs-2 as an ECS instance series. 有効値 : ecs-1 および ecs-2
            request.setNetType("classic"); // ネットワークタイプとして classic を指定できます。 有効な値 : classic および vpc
            List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
            CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
            masterOrder.setIndex(1);
            masterOrder.setDiskCapacity(50);
            masterOrder.setDiskCount(2);
            masterOrder.setDiskType("CLOUD_EFFICIENCY"); // specify disk type (2)
            masterOrder.setInstanceType("ecs.n1.large"); // specify ecs instance type
            masterOrder.setNodeCount(1);
            masterOrder.setNodeType("MASTER"); // MASTER or CORE (2)
            ecsOrders.add(masterOrder);
            CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
            coreOrder.setIndex(2);
            coreOrder.setDiskCapacity(50);
            coreOrder.setDiskCount(4);
            coreOrder.setDiskType("CLOUD_EFFICIENCY");
            coreOrder.setInstanceType("ecs.n1.large");
            coreOrder.setNodeCount(3);
            coreOrder.setNodeType("CORE");
            ecsOrders.add(coreOrder);
            request.setEcsOrders(ecsOrders);
            try {
                CreateClusterResponse response = client.getAcsResponse(request);
                String clusterId = response.getClusterId(); // cluster id
                // TODO do something with this cluster
            } catch (Exception e) {
                // TODO do something
            }
        }
      • クラスターを作成する場合は、このクラスターをホスティングするセキュリティグループを指定する必要があります。 セキュリティグループの ID を指定しなかった場合は、このセキュリティグループの名前を指定する必要があります。 クラスター作成時にセキュリティグループを作成する必要があります。
      • 詳細は、「Enumeration」をご参照ください。
      • 上記のコードスニペットは、古典的なネットワークでクラスターを作成します。 VPC ネットワーククラスターを作成する場合は、リクエストメソッドで vpc を指定して以下のとおり vpcid と vswitchid を指定する必要があります。
        request.setNetType("vpc"); // You can specify vpc as a network type.
        request.setVpcId("your-vpcId");
        request.setVSwitchId("your-switchId");
      • 高可用性パラメーターを指定できます。 高可用性パラメーターの詳細は、「クラスターの作成 (Create a cluster)」のハードウェア設定のセクションをご参照ください。
        request.setHighAvailabilityEnable(true);
      • 利用可能なソフトウェアコンポーネントを指定できます。 利用可能なソフトウェアコンポーネントの詳細は、「クラスターの作成 (Create a cluster)」のソフトウェア設定のセクションをご参照ください。
        List<String> soft = new ArrayList<String>();
        soft.add("presto");
        soft.add("oozie");
        request.setOptionSoftWareLists(soft);
      • 設定項目を指定できます。 詳細は、こちらをクリックしてください。.
        Request. setconfigurations ("Oss: // your-bucket/your-conf.json ");
      • ブートストラップ操作を指定できます。 詳細は、こちらをクリックしてください。
        List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>();
        CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction();
        bootstrapActionList.setName("bootstrapName");
        bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py");
        bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc");
        bootstrapActionLists.add(bootstrapActionList);
        request.setBootstrapActions(bootstrapActionLists);
    • クラスターの詳細を照会
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            IAcsClient client = new DefaultAcsClient(profile);
            final DescribeClusterRequest request = new DescribeClusterRequest();
            request.setId("C-XXXXXXXXXXXXXXXX"); // cluster id
            try {
                DescribeClusterResponse response = client.getAcsResponse(request);
                DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
                // TODO do something with this cluster
            } catch (Exception e) {
                // TODO do something
            }
        }
    • クラスターの一覧を照会
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final ListClustersRequest request = new ListClustersRequest();
                request.setPageNumber(1);
                request.setIsDesc(true);
                request.setPageSize(20);
                try {
                    ListClustersResponse response = client.getAcsResponse(request);
                    List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
                    for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
                        // TODO do something with this cluster
                    }
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • クラスターを解放
      public static void main(String[] args) {
                  IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                  DefaultAcsClient client = new DefaultAcsClient(profile);
                  ReleaseClusterRequest request = new ReleaseClusterRequest();
                  request.setId("C-XXXXXXXXXXXXXXXX"); // specify the cluster id you want to release.
                  try {
                      ReleaseClusterResponse response = client.getAcsResponse(request);
                  } catch (Exception e) {
                      // TODO do something
                  }
              }
  • ジョブ
    • ジョブの作成
      public static void main(String[] args) {
            IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
            DefaultAcsClient client = new DefaultAcsClient(profile);
            final CreateJobRequest request = new CreateJobRequest();
            request.setName("Your-Job-Name");
            request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
            request.setFailAct("CONTINUE"); // STOP or CONTINUE
            request.setType("SPARK"); // SPARK or HADOOP or HIVE or PIG
      try {
                  CreateJobResponse response = client.getAcsResponse(request);
                  String jobId = response.getId();
                  // TODO do something with this job
              } catch (Exception e) {
                  // TODO do something
              }
          }
      ```
    • ジョブの削除
      重要 ジョブが実行プランによって使用中のときは、このジョブを削除することはできません。 このジョブを削除する前に、この実行プランを削除するか、この実行プランを変更する必要があります。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                IAcsClient client = new DefaultAcsClient(profile);
                final DeleteJobRequest request = new DeleteJobRequest();
                request.setId("J-XXXXXXXXXXXXXXXX"); // set  job id
                try {
                    DeleteJobResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
  • 実行プラン
    • 実行プランの作成
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                IAcsClient client = new DefaultAcsClient(profile);
                final CreateExecutionPlanRequest request = new CreateExecutionPlanRequest();
                request.setName("Your-ExecutionPlan-Name");
                request.setCreateClusterOnDemand(false);
                request.setStrategy("RUN_MANUALLY"); // RUN_MANUALLY or SCHEDULE
                request.setClusterId("C-XXXXXXXXXXXXXXXX"); // specify an existing running cluster
                List<String> jobIds = new ArrayList<String>();
                jobIds.add("J-XXXXXXXXXXXXXXXX"); // specify a job
                request.setJobIdLists(jobIds);
                try {
                    CreateExecutionPlanResponse response = client.getAcsResponse(request);
                    String executionPlanId = response.getId();
                    // TODO do something with this execution plan
                } catch (Exception e) {
                    // TODO do something
                }
            }

      上記のコードスニペットは、RUN_MANUALLY タイプの実行プランを作成します。 また、この実行プランには既存のクラスターが指定されています。

      SCHEDULE タイプの実行プランを作成する場合は、上記のコードスニペットを変更して以下のコードスニペットを追加する必要があります。
      request.setStrategy("SCHEDULE"); // RUN_MANUALLY or SCHEDULE
                request.setStartTime(new Date().getTime()); // set start time
                request.setTimeUnit("DAY"); // DAY or HOUR
                request.setTimeInterval(1); // set time interval
      ジョブを実行するための新しいクラスターを作成する実行プランを作成する場合は、上記のコードスニペットを変更して以下のコードスニペットを追加する必要があります。
      request.setCreateClusterOnDemand(true);
                request.setClusterType("HADOOP");
                request.setClusterName("Your-Cluster-Name");
                request.setEmrVer("EMR-1.3.0");
                request.setSecurityGroupId("Your-Security-Group-Id");
                request.setIsOpenPublicIp(true);
                 // The I/O optimization parameters. ECS インスタンスタイプやクラウドディスクタイプなどの使用可能なハードウェア設定は、指定された ECS インスタンスシリーズによって決まります。
                // For more information about how to select these configurations, see the Buy Now page of Elastic Compute Service.
                // https://ecs.console.aliyun.com/#/create/postpay/
                request.setIoOptimized(true); // You can specify true to enable I/O optimization.
                request.setInstanceGeneration("ecs-2"); // You can specify esc-2 as an ECS instance series. Valid values: ecs-1 and ecs-2.
                request.setNetType("classic"); // You can specify classic as a network type. Valid values: classic and vpc.
                request.setLogEnable(true);
                request.setLogPath("oss://xxx");
                request.setEcsOrders(); // TODO you can refer to the configurations when you create a cluster. ecsOder のタイプは CreateExecutionPlanRequest.EcsOrder です。ご注意ください。 ここでいう ecsOder のタイプは CreateClusterRequest.EcsOrder とは異なります。

      前のパラメーターを指定してクラスターを構成できます。 これらのパラメーターの詳細は、「クラスターを作成する」のセクションをご参照ください。 新しいクラスターでジョブを実行する必要がある実行プランの場合、この実行プランを実行するたびに一時クラスターが作成されます。 一時クラスターは、指定されたクラスター構成に基づいて作成され、実行プランが完了すると解放されます。 クラスタを作成する一般的なプロセスとは異なり、この場合はクラスター作成時にセキュリティグループ名ではなくセキュリティグループ ID を指定する必要があります。

      オンデマンド作成オプションと定期スケジュールオプションは相互に排他的ではありません。 実行プランがスケジューリングされた時間に開始されると、新しいクラスターが作成されます。

    • 実行プランの削除
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                final DeleteExecutionPlanRequest request = new DeleteExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // set execution plan id
                try {
                    DeleteExecutionPlanResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 実行プランの実行
      重要 実行中またはスケジューリング中の実行プランは実行できません。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your-AccessKeyId>", "<Your-AccessKeySecret>");
                IAcsClient client = new DefaultAcsClient(profile);
                RunExecutionPlanRequest request = new RunExecutionPlanRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id which to run
                try {
                    RunExecutionPlanResponse response = client.getAcsResponse(request);
                    String instanceId = response.getExecutionPlanInstanceId();
                    // TODO do something with this instance
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • スケジューリング済みの実行プランを中断
      定期的に実行される実行プランの場合 (次の図を参照) は、以下のとおり SDK が提供するメソッドを呼び出すことによりこの実行プランを中断できます。
      public static void main(String[] args) {
                IClientProfile clientProfile = DefaultProfile.GetProfile("cn-shanghai", "<your-access-key-id>", "<your-access-key-secret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                SuspendExecutionPlanSchedulerRequest request = new SuspendExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
                try {
                    SuspendExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 実行プランの再開
      定期的に実行される実行プランの場合 (次の図を参照) は、以下のとおり SDK が提供するメソッドを呼び出すことによりこの実行プランを再開できます。
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>"); 
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ResumeExecutionPlanSchedulerRequest request = new ResumeExecutionPlanSchedulerRequest();
                request.setId("WF-XXXXXXXXXXXXXXXX"); // specify the execution plan id you want to suspend
                try {
                    ResumeExecutionPlanSchedulerResponse response = client.getAcsResponse(request);
                } catch (Exception e) {
                    // TODO do something
                }
            }
    • 実行プランの実行ログを照会
      public static void main(String[] args) {
                IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", "<Your AccessKeyId>", "<Your AccessKeySecret>");
                DefaultAcsClient client = new DefaultAcsClient(profile);
                ListExecutionPlanInstancesRequest request = new ListExecutionPlanInstancesRequest();
                // specify execution plan ids
                List<String> executionPlanIds = new ArrayList<String>();
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX1");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX2");
                executionPlanIds.add("WF-XXXXXXXXXXXXXXX3");
                request.setExecutionPlanIdLists(executionPlanIds); // (1)
                // specify order key (ordered by id)
                request.setIsDesc(true);
                // specify page number and page size, default page number is 1 and default page size is 10.
                request.setPageSize(20);
                request.setPageNumber(1);
                // specify if you want to list latest instance for each execution plan id.
                request.setOnlyLastInstance(true); // (2) default is false
                try {
                    ListExecutionPlanInstancesResponse response = client.getAcsResponse(request);
                    for (ListExecutionPlanInstancesResponse.ExecutionPlanInstance instance : response.getExecutionPlanInstances()) {
                        // TODO do something with each instance
                    }
                } catch (Exception e) {
                    // TODO do something
                }
            }
      • 実行中のログを照会するため複数の実行プラン ID を指定できます。
      • 最新の実行ログを照会すると、指定された実行プランの最新の実行ログのみが返されます。 この機能を使用して、最後の実行プランが正常に実行されたかどうかを確認したり、実行プランの最新のログを照会したりすることができます。