All Products
Search
Document Center

SchedulerX:MapReduce

Last Updated:Feb 01, 2024

MapReduce is a lightweight model that is developed by SchedulerX to perform batch data processing in a distributed manner. You can call the MapJobProcessor or MapReduceJobProcessor interface to use connected workers as distributed computing engines that process large amounts of data in batches. MapReduce has the following advantages over traditional methods, such as Hadoop and Spark, that are used to perform batch processing on massive amounts of data: low cost, high speed, and simple programming. MapReduce processes data in seconds without the need to import the data to big data platforms and without additional storage and computing costs.

Precautions

  • A task cannot exceed 64 KB in size.

  • The return value of the result field of ProcessResult cannot exceed 1,000 bytes in size.

  • If you use the reduce method, the results of all tasks are cached on the master node. In this case, high memory pressure is caused on the master node. We recommend that you specify a small number of tasks and a small return value for the result field. If the reduce method is not required, you can directly call the MapJobProcessor interface.

  • If a failover is triggered, SchedulerX may run a task more than once. In this case, you must implement the idempotence of tasks on your own.

Methods

  • MapJobProcessor derived class

    Method

    Description

    Required

    public ProcessResult process(JobContext context) throws Exception;

    The entry to execute a task. You must develop the logic that is used to obtain the name of the task from the context. After this method is called, the system returns ProcessResult.

    Yes

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    The map method is used to distribute a batch of tasks to multiple workers. You can call the map method multiple times. If taskList is empty, an error is returned. After this method is called, the system returns ProcessResult.

    Yes

    public void kill(JobContext context);

    This method is triggered when a job is terminated in the frontend. You must develop the logic that is used to interrupt your workloads.

    No

  • MapReduceJobProcessor derived class

    Method

    Description

    Required

    public ProcessResult process(JobContext context) throws Exception;

    The entry to execute a task. You must develop the logic that is used to obtain the name of the task from the context. After this method is called, the system returns ProcessResult.

    Yes

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    The map method is used to distribute a batch of tasks to multiple workers. You can call the map method multiple times. If taskList is empty, an error is returned. After this method is called, the system returns ProcessResult.

    Yes

    public ProcessResult reduce(JobContext context);

    The reduce method is called back after the tasks on all worker nodes are executed. The reduce method is executed by the master node and usually used to aggregate data, send notifications downstream, or pass data between upstream and downstream applications in workflows.

    The reduce method handles the results of all tasks.

    1. The system uses return ProcessResult(true, result) to return the execution result of a task, such as an order ID.

    2. When the system calls the reduce method, the system obtains the status (context.getTaskStatuses()) and the execution results (context.getTaskResults()) of all tasks from the context and then processes the execution results.

    Yes

    public void kill(JobContext context);

    This method is triggered when a job is terminated in the frontend. You must develop the logic that is used to interrupt your workloads.

    No

    public boolean runReduceIfFail(JobContext context)

    Specifies whether to call the reduce method if a task fails. Default configuration: The reduce method is executed if a task fails.

    No

Procedure

  1. Log on to the SchedulerX console. In the left-side navigation pane, click Task Management.

  2. On the Task Management page, click Create task.

  3. In the Create Task panel, select MapReduce from the Execution mode drop-down list and configure the related parameters in the Advanced Configuration section. The following table describes the parameters.

    Parameter

    Description

    distribution policy

    Note

    The agent version must be 1.10.3 or later.

    • Polling Scheme: The system evenly distributes the same number of tasks to each worker. This policy is suitable for scenarios in which each worker requires almost the same amount of time to process a task. This is the default value.

    • workerLoad optimal strategy: The master node automatically detects the loads of worker nodes. This policy is suitable for scenarios in which a large difference exists between the amount of time each worker requires to process a task.

    Number of single-machine concurrent subtasks

    The number of execution threads on a worker. Default value: 5. To speed up the execution, you can specify a larger value. If the downstream or the databases cannot withstand the value that you specified, you can specify a smaller value.

    Number of failed retries of subtasks

    If a task fails, the task is automatically retried. Default value: 0.

    Sub-task failure retry interval

    The interval between two consecutive retries if a task fails. Unit: seconds. Default value: 0.

    Subtask Failover Strategy

    Note

    The agent version must be 1.8.12 or later.

    Specifies whether to distribute a task to a new worker after the worker failed to execute the task and was stopped. If you turn on the switch, the system may execute a task more than once when a failover is triggered. You must implement the idempotence of tasks on your own.

    The master node participates in the execution

    Note

    The agent version must be 1.8.12 or later.

    Specifies whether the master node participates in the execution of tasks. At least two workers must be available to run tasks. If an extremely large number of tasks exist, we recommend that you turn off the switch.

    Subtask distribution method

    • Push model: The system evenly distributes tasks to each worker.

    • Pull model: Each worker automatically pulls tasks. The Wooden Bucket Theory is not applicable to this model. This model supports dynamic scale-up to pull tasks. During the pull process, all tasks are cached on the master node. This causes high pressure on the memory. We recommend that you do not distribute more than 10,000 tasks at the same time.

    For more information about parameters, see Advanced parameters for job management

Demos

Process data in a single table (consecutive IDs)

  1. The job reads the minimum and maximum IDs.

    select min(id), max(id) from Tab1;
  2. Pagination is performed based on the range of IDs. Each task contains the startId and endId fields.

  3. Each task obtains data by ID range.

    select * from Tab1 where id >= startId and id < endId;

Example:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName=context.getJobParameters(); // The backend code of multiple jobs can be consistent. Configure the job parameter in the SchedulerX console to specify the table name.
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // Calculate the number of pages.
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

Process data in a single table (discontinuous IDs)

  1. Databases adopt a bucketing strategy and adds a bucket field as an index.

  2. For example, 1,024 buckets exist. Each time a row of records is added to the database, the order number or ID is hashed. For example, the order number %1024 is included in the bucket field.

  3. Basically, each bucket is averaged. For each bucket, you can run the following SQL statement to perform a full query on the results:

    select * from Tab1 where bucket=xxx;

Example:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); // The backend code of multiple jobs can be consistent. Configure the job parameter in the SchedulerX console to specify the table name. 
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

Process the data of database sharding and table sharding

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            //Perform database sharding.
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            //Perform table sharding based on the database sharding result.
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            //Perform pagination if the table shard is large.
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); // Calculate the number of pages.
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO Return the list of database shards.
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        // TODO Return the list of table shards.
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

Process 50 messages and call the reduce method to return the result

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

Process 50 messages and call the reduce method to aggregate the execution results of all tasks

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}