All Products
Search
Document Center

SchedulerX:Visual MapReduce model

Last Updated:Dec 04, 2025

The visual MapReduce model is developed based on the MapReduce model to support visualized O&M capabilities. You do not need to modify code in the backend. You need to only change the distributed model of a job to the visual MapReduce model in the SchedulerX console. Then, a page that contains all tasks generated for the job is displayed in the console. On this page, you can view the details and results of each task and re-run each task.

Precautions

  • The SDK V1.12.2 and earlier are subject to security vulnerabilities. Make sure that the SDK version is later than 1.12.2.

  • Only SchedulerX Professional Edition supports the visual MapReduce model.

  • The number of tasks cannot exceed 1,000.

  • A task cannot exceed 64 KB in size.

  • When the custom label information of tasks is displayed, you must implement specified interfaces for the task objects.

  • The return value of the result field of ProcessResult cannot exceed 1,000 bytes in size.

  • If you use the reduce method, the results of all tasks are cached on the master node. In this case, high memory pressure is caused on the master node. We recommend that you specify a small number of tasks and a small return value for the result field. If the reduce method is not required, you can directly call the MapJobProcessor interface.

  • If a failover is triggered, SchedulerX may run a task more than once. In this case, you must implement the idempotence of tasks on your own.

Interfaces

  • The visual MapReduce model inherits all interfaces of the MapReduce model. The job processing code of the visual MapReduce model is the same as that of the MapReduce model. For more information, see MapReduce.

  • Optional: You can specify labels for each task that is executed by the visual MapReduce model. If the labels of tasks are displayed, you must implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface for the task objects.

    Interface

    Description

    Required

    public Map<String, String> labelMap()

    This interface is used to output the label information of tasks. The label information includes the account name, product code, city, and other custom features of the task objects.

    No

Comparison between MapReduce and visual MapReduce

Comparison item

MapReduce

Visual MapReduce

Number of tasks

More than one million.

Less than or equal to 1,000.

Task development mode

The same mode is used.

Task list

Not supported

Supported

Task running details

Not supported

Supported. The details include the execution records, execution status, logs, tracing analysis, and running stacks of each task.

Task labels

Not supported

Supported. After the BizSubTask interface is implemented for the task, you can view the information about business labels.

Task operations

Not supported

Supported. You can stop and re-run a single task.

Sample code for developing a job

Process multiple accounts at a time

Example: You want to process multiple bank accounts at a time. Each bank account is considered an independent task that you can run in global parallel mode in a cluster. Each task in the task list must have its account information displayed for easy query. This helps you understand the processing status and execution details of each bank account in a quick manner. The following sample code is for your reference.

  1. Specify custom objects for each bank account. Each object supports the display of label information. You must implement the com.alibaba.schedulerx.worker.processor.BizSubTask interface and the labelMap method for each object.

    Show code

    public class ParallelAccountInfo implements BizSubTask {
    
        /**
         * Primary key
         */
        private long id;
    
        private String name;
    
        private String accountId;
    
        public ParallelAccountInfo(long id, String name, String accountId) {
            this.id = id;
            this.name = name;
            this.accountId = accountId;
        }
    
        /**
         * Implement the labelMap method that is used to specify a label for a task.
         * @return
         */
        @Override
        public Map<String, String> labelMap() {
            Map<String, String> labelMap = new HashMap();
            labelMap.put("Account name", name);
            return labelMap;
        }
    }

    After the interface is implemented for each task object, the task list displays the unique label information such as the account name of each task object. You can use the label information to understand the business processing status of each bank account and search for bank accounts by label.

    image

  2. Use Processor to process the business logic of each bank account and inherit com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor.

    Show code

    public class ParallelJob extends MapReduceJobProcessor {
    
        private static final Logger logger = LoggerFactory.getLogger("schedulerx");
    
        @Override
        public ProcessResult reduce(JobContext context) throws Exception {
            return new ProcessResult(true);
        }
    
        @Override
        public ProcessResult process(JobContext context) throws Exception {
            if(isRootTask(context)){
                logger.info("Build parallel computing tasks.");
                List<ParallelAccountInfo> list = new LinkedList();
                /**
                 *  For the root task, build parallel computing task objects.
                 *  In actual scenarios, you can load task objects based on your business requirements and implement the BizSubTask interface for the task objects.
                 *  Sample scenarios:
                 *  1. Load unprocessed customer account information from the database.
                 *  2. Construct an area table to distribute jobs by area, such as province, city, and district.
                 *  3. Classify tasks by using business labels, such as electric appliance, daily necessity, and food.
                 *  4. Classify tasks by using time-based values. For example, use a month, such as January or February.
                 */
                for(int i=0; i < 20; i++){
                    list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
                            "AC"+StringUtils.leftPad(i+"", 12, "0")));
                }
                return map(list, "transfer");
            }else {
                /**
                 * For a non-root task, obtain the task information to process the task.
                 */
                ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask();
                // Process the task based on the obtained obj task information.
                // do something
                logger.info("Process task information :{}", JSON.toJSONString(obj));
                return new ProcessResult(true);
            }
        }
    }

    After you develop and deploy the job, configure a scheduled task in the SchedulerX console to run the job at the specified point in time. For more information, see the Procedure section of this topic.

Procedure

Configure a visual MapReduce model for a job

  1. Log on to the SchedulerX console. In the left-side navigation pane, click Task Management.

  2. On the Task Management page, click Create task.

  3. In the Create task panel, select Visual MapReduce from the Execution mode drop-down list.

    image

  4. In the Advanced Configuration section, configure parameters based on your business requirements. For more information about other parameters, see Advanced parameters for job management.

    Parameter

    Description

    distribution policy

    Polling Scheme: The system evenly distributes the same number of tasks to each worker. This policy is suitable for scenarios in which each worker requires almost the same amount of time to process a task. This is the default value.

    Worker load optimal strategy: The master node automatically detects the loads of worker nodes. This policy is suitable for scenarios in which a large difference exists between the amount of time each worker requires to process a task.

    Note

    This parameter is displayed only if the client version is V1.10.14 or later.

    Number of single-machine concurrent subtasks

    The number of execution threads on a worker. Default value: 5. To speed up the execution, you can specify a larger value. If the downstream or the databases cannot withstand the value that you specified, you can specify a smaller value.

    Number of failed retries of subtasks

    The number of retries if a task fails. Default value: 0.

    Sub-task failure retry interval

    The interval between two consecutive retries. Unit: seconds. Default value: 0.

    Subtask Failover Strategy

    Specifies whether to distribute a task to a new worker after the worker failed to execute the task and was stopped. If you turn on the switch, the system may execute a task more than once when a failover is triggered. You must implement the idempotence of tasks.

    Note

    This parameter is displayed only if the client version is V1.8.13 or later.

    The master node participates in the execution

    Specifies whether the master node participates in the execution of tasks. At least two workers must be available to run tasks. If an extremely large number of tasks exist, we recommend that you turn off the switch.

    Note

    This parameter is applicable to agents V1.8.13 and later.

View task details of the job on GUIs

After the job is executed, you can find the job on the Execution List page and click Details in the Operation column to view the execution details of the tasks.

  • On the Subtask list tab, view the execution status of each task.

    image.png

  • On the Subtask list tab, find the task that you want to manage and click Log in the Operation column to view the business log information of each task and analyze the execution result.

    image.png

  • When the execution records of the job are available, click ThreadDump on the Current execution details tab to view the thread execution status of the relevant machine and analyze the error details of the job.

    image.png

  • After Tracing Analysis is connected, you can click the value in the TraceId column of the task on the Subtask list tab to query the details of the corresponding trace. For more information, see Integrate tracing analysis.

    image

References