All Products
Search
Document Center

MaxCompute:Obtain the execution progress of a task

Last Updated:Nov 20, 2024

This topic describes how to obtain the execution progress of a task by using the SDK for Java.

Background information

The SDK for Java provides the Instance#getTaskProgress method for you to obtain the status of workers in all stages. You can calculate the overall progress of a task based on the status of task workers.

Before you obtain the execution progress of a task, you must be familiar with the following concepts: MaxCompute instance, MaxCompute task, Fuxi job, Fuxi task (stage), Fuxi instance (worker). The following content describes the relationships among these concepts:

  • In most cases, a MaxCompute instance corresponds to a MaxCompute task.

  • A MaxCompute task can consist of one or more Fuxi jobs. If a SQL statement is complex, MaxCompute automatically submits multiple Fuxi jobs to Fuxi.

  • A Fuxi job consists of multiple stages. Multiple workers run in a stage. Workers in the same stage have the same data processing logic.

Stage

Examples

The following code shows how to obtain the execution progress of stages:

import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.aliyun.odps.Instance;
import com.aliyun.odps.LogView;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;

public class InstanceManange {

    private static final String STAGE_FORMAT = "%-26s %13s  %5s  %9s  %7s  %7s  %6s";


    public static void printStage(List<Instance.StageProgress> stageprogresses) throws OdpsException {


        long startTime = System.currentTimeMillis();
        // List<Instance.StageProgress> stageprogresses = instance.getTaskProgress(taskName);

        // STAGES        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  BACKUP
        String HEADER_FORMAT = "%26s %13s  %5s  %9s  %7s  %7s  %6s";
        System.out.println(String.format(HEADER_FORMAT, "STAGES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "BACKUP"));

        int sumComplete = 0;
        int sumTotal = 0;
        int stageCompletedSum = 0;

        for (Instance.StageProgress progress : stageprogresses) {
            String name = progress.getName();
            // Note: After the getTotalWorkers() method is executed, the execution progress of backup workers is not returned. In most cases, 0qu is returned.
            int backup = progress.getBackupWorkers();
            int total = progress.getTotalWorkers();
            int all = backup + total;
            int running = progress.getRunningWorkers();
            int completed = progress.getTerminatedWorkers();
            int pending = all - completed - running;

            Instance.StageProgress.Status status = progress.getStatus();
            String statusString = status != null ? status.toString() : "NULL";

            // M1_job_0     RUNNING      7          7        0        0       0       0     0
            String vertexStr = String.format(STAGE_FORMAT, name, statusString, total, completed, running, pending, backup);
            System.out.println(vertexStr);
            sumComplete += completed;
            sumTotal += total;

            // Mark the stage as Completed
            if (status == Instance.StageProgress.Status.TERMINATED) {
                stageCompletedSum += 1;
            }

        }

        String FOOTER_FORMAT = "%-15s  %-4s  %-25s";

        String verticesSummary = String.format("STAGES: %02d/%02d", stageCompletedSum, stageprogresses.size());
        final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
        final int progressPercent = (int) (progress * 100);
        String progressStr = "" + progressPercent + "%";
        float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
        String elapsedTime = "ELAPSED TIME: " + et + " s";

        System.out.println(String.format(FOOTER_FORMAT, verticesSummary, progressStr, elapsedTime));
        System.out.println();
    }

    public static void main(String[] args) throws Exception {
        String ak = "";
        String sk = "";
        String endpoint = "";
        String project = "";
        String sql = "";
        Odps odps = new Odps(new AliyunAccount(ak, sk));
        odps.setEndpoint(endpoint);
        odps.setDefaultProject(project);

        String taskName = "TestGetProgressTask";
        Instance instance = SQLTask.run(odps, project, sql, taskName, null, null);

        System.out.println("LogView:\n" + new LogView(odps).generateLogView(instance, 24) + "\n");
        instance.getTaskDetailJson2(taskName);

        CompletableFuture<Void> future = CompletableFuture.runAsync((() -> {
            try {
                while (true) {
                    printStage(instance.getTaskProgress(taskName));
                    Thread.sleep(200);
                    if (instance.isSuccessful()) {
                        // If a task is successfully executed, the output of the List<Instance.StageProgress> command is empty.
                        printStage(instance.getTaskProgress(taskName));
                        return;
                    }
                }
            } catch (OdpsException | InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }));
        future.join();
    }

}

The following result is returned:


STAGES                           STATUS      TOTAL  COMPLETED  RUNNING  PENDING  BACKUP
M1_job_0                         RUNNING      1          0        0        1       0
M2_1_job_0                       WAITING      1          0        0        1       0
STAGES: 00/02     0%    ELAPSED TIME: 20.97 s