All Products
Search
Document Center

SchedulerX:Visual MapReduce model

Last Updated:Mar 21, 2024

The visual MapReduce model is developed based on the MapReduce model to support visualized O&M capabilities. You need to only change the distributed model of a job to the visual MapReduce model in the SchedulerX console. Then, a page that contains all tasks generated for the job is displayed in the console. You do not need to modify code in the backend. On this page, you can view the details and the results of each task and re-run each task.

Precautions

  • Only SchedulerX Professional Edition supports the visual MapReduce model.

  • The number of tasks cannot exceed 1,000.

  • A task cannot exceed 64 KB in size.

  • When the custom label information of tasks are displayed, the tasks objects must implement specified interfaces.

  • The return value of the result field of ProcessResult cannot exceed 1,000 bytes in size.

  • If you use the reduce method, the results of all tasks are cached on the master node. In this case, high memory pressure is caused on the master node. We recommend that you specify a small number of tasks and a small return value for the result field. If the reduce method is not required, you can directly call the MapJobProcessor interface.

  • If a failover is triggered, SchedulerX may run a task more than once. In this case, you must implement the idempotence of tasks on your own.

Interfaces

  • The visual MapReduce model inherits all interfaces of the MapReduce model. The job processing code of the visual MapReduce model is the same as that of the MapReduce model. For more information, see MapReduce.

  • (Optional) You can specify labels for each task that is executed by the visual MapReduce model. If the labels of tasks are displayed, the task objects must implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface.

    Interface

    Description

    Required

    public Map<String, String> labelMap()

    This interface is used to output the label information of tasks. The label information includes the account name, product code, city, and other custom features of the task objects.

    No

Comparison between MapReduce and visual MapReduce

Item

MapReduce

Visual MapReduce

Number of tasks

More than one million

Less than or equal to 1,000

Task development mode

Same

Task list

Not supported

Supported.

Task running details

Not supported

Supported. The details include the execution records, execution status, logs, tracing analysis, and running stacks of each task.

Task labels

Not supported

Supported. After tasks implement the BizSubTask interface, you can view the information about business labels.

Task operations

Not supported

Supported. You can stop and rerun a single task.

Demo: Develop a job

Process accounts in batch

Example: You want to process multiple bank accounts at the same time. Each bank account is considered as an independent task that you can run in global parallel mode in a cluster. Each task in the task list must have its account information displayed for easy query. This helps you understand the processing status and execution details of each bank account in a quick manner. The following demo code is used for your reference.

  1. Specify custom objects for each bank account. Each object supports the display of its label information and must implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface and the labelMap method.

Show code:

public class ParallelAccountInfo implements BizSubTask {

    /**
     * Primary key
     */
    private long id;

    private String name;

    private String accountId;

    public ParallelAccountInfo(long id, String name, String accountId) {
        this.id = id;
        this.name = name;
        this.accountId = accountId;
    }

    /**
     * Implement the labelMap method that is used to specify a label for a task.
     * @return
     */
    @Override
    public Map<String, String> labelMap() {
        Map<String, String> labelMap = new HashMap();
        labelMap.put("Account name", name);
        return labelMap;
    }
}

After each task implements the interface, the task list displays the unique label information such as the account name of each task object. You can use the label information to understand the business processing status of each bank account and search for bank accounts by label.

image

  1. Use Processor to process the business logic of each bank account and inherit com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor.

Show code:

public class ParallelJob extends MapReduceJobProcessor {

    private static final Logger logger = LoggerFactory.getLogger("schedulerx");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        if(isRootTask(context)){
            logger.info("Build parallel computing tasks.");
            List<ParallelAccountInfo> list = new LinkedList();
            /**
             *  For the root task, build parallel computing task objects.
             *  In actual scenarios, you can load task objects based on your business requirements and implement the BizSubTask interface for the task objects.
             *  Sample scenarios:
             *  1. Load unprocessed customer account information from the database.
             *  2. Construct an area table to distribute jobs by area, such as province, city, and district.
             * 3. Classify tasks by using business labels, such as electric appliance, daily necessity, and food.
             * 4. Classify tasks by using time-based values. For example, you can use a month, such as January or February.
             */
            for(int i=0; i < 20; i++){
                list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
                        "AC"+StringUtils.leftPad(i+"", 12, "0")));
            }
            return map(list, "transfer");
        }else {
            /**
             * For a non-root task, obtain the task information to process the task.
             */
            ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask();
            // Process the task based on the obtained obj task information.
            // do something
            logger.info("Process task information :{}", JSON.toJSONString(obj));
            return new ProcessResult(true);
        }
    }
}

3. After you develop and deploy the job, configure a scheduled task in the SchedulerX console to run the job at the specified points in time. For more information, see Procedure.

Procedure

Configure a visual MapReduce model for a job

  1. Log on to the SchedulerX console. In the left-side navigation pane, click Task Management.

  2. On the Task Management page, click Create task.

  3. In the Create task panel, select Visual MapReduce from the Execution mode drop-down list.

    image

  4. In the Advanced Configuration section, configure parameters based on your business requirements.

    Parameter

    Description

    distribution policy

    Polling Scheme: The system evenly distributes the same number of tasks to each worker. This policy is suitable for scenarios in which each worker requires almost the same amount of time to process a task. This is the default value.

    workerLoad optimal strategy: The master node automatically detects the loads of worker nodes. This policy is suitable for scenarios in which a large difference exists between the amount of time each worker requires to process a task.

    Note

    The client version must be V1.10.14 or later.

    Number of single-machine concurrent subtasks

    The number of execution threads on a worker. Default value: 5. To speed up the execution, you can specify a larger value. If the downstream or the databases cannot withstand the value that you specified, you can specify a smaller value.

    Number of failed retries of subtasks

    If a task fails, the task is automatically retried. Default value: 0.

    Sub-task failure retry interval

    The interval between two consecutive retries if a task fails. Unit: seconds. Default value: 0.

    Subtask Failover Strategy

    Specifies whether to distribute a task to a new worker after the worker failed to execute the task and was stopped. If you turn on the switch, the system may execute a task more than once when a failover is triggered. You must implement the idempotence of tasks.

    Note

    The client version must be V1.18.13 or later.

    The master node participates in the execution

    Specifies whether the master node participates in the execution of tasks. At least two workers must be available to run tasks. If an extremely large number of tasks exist, we recommend that you turn off the switch.

    Note

    The client version must be V1.18.13 or later.

For more information about parameters, see Advanced parameters for job management

View task details of the job on GUIs

After the job is executed, find the job on the Execution List page and click Details in the Operation column to view the execution details of the tasks.

  • On the Subtask list tab, view the execution status of each task.

    image.png

  • On the Subtask list tab, find the task that you want to manage and click Log in the Operation column to view the business log information of each task and analyze the execution result.

    image.png

  • When the execution records of the job are available, click ThreadDump on the Current execution details tab to view the thread execution status of the relevant machine and analyze the error details of the job.

    image.png

  • On the Subtask list tab, click the value in the TraceId field after Tracing Analysis is integrated into SchedulerX to query the traces called to run each task. For more information, see Integrate tracing analysis.

image

References

Enterprise grade distributed batch processing solution

MapReduce