全部產品
Search
文件中心

Microservices Engine:MapReduce模型

更新時間:Aug 01, 2025

MapReduce模型是SchedulerX自主研發的輕量級分布式跑批模型。通過MapJobProcessor或MapReduceJobProcessor介面將接入的Worker組成分散式運算引擎進行巨量資料跑批。相對於傳統的巨量資料跑批(例如Hadoop、Spark等),MapReduce無需將資料匯入巨量資料平台,且無額外儲存及計算成本,即可實現秒層級海量資料處理,具有成本低、速度快、編程簡單等特性。

注意事項

  • 單個子任務的大小不能超過64 KB。

  • ProcessResult的result傳回值不能超過1000 Byte。

  • 如果使用reduce,所有子任務結果會緩衝在Master節點,該情況對Master節點記憶體壓力較大,建議子任務個數和result傳回值不要太大。如果沒有reduce需求,使用MapJobProcessor介面即可。

  • SchedulerX不保證子任務絕對執行一次。在特殊條件下會failover,可能導致子任務重複執行,需要業務方自行實現等冪。

介面

繼承類MapJobProcessor

介面

解釋

是否必選

public ProcessResult process(JobContext context) throws Exception;

每個子任務執行業務的入口,需從context中擷取taskName,您需自行判斷子任務名稱。邏輯處理完成後,返回ProcessResult

public ProcessResult map(List<? extends Object> taskList, String taskName);

執行map方法可以將一批子任務分布至多台機器上執行,可以多次執行map方法。如果taskList為空白,返回失敗。執行完成後,返回ProcessResult

public void kill(JobContext context);

前端kill任務會觸發該方法,需自行實現如何中斷業務。

繼承類MapReduceJobProcessor

介面

解釋

是否必選

public ProcessResult process(JobContext context) throws Exception;

每個子任務執行業務的入口,需從context裡擷取taskName,您需自行判斷子任務名稱。邏輯處理完成後,返回ProcessResult

public ProcessResult map(List<? extends Object> taskList, String taskName);

執行map方法可以將一批子任務分布至多台機器上執行,可以多次執行map方法。如果taskList是空,返回失敗。執行完成後,返回ProcessResult

public ProcessResult reduce(JobContext context);

所有Worker節點的子任務執行完成後,會回調reduce方法。reduce由Master節點執行,一般用於資料彙總或通知下遊,也可以用於工作流程的上下遊資料傳遞。

reduce方法能處理所有子任務的結果。

  1. 子任務通過return ProcessResult(true, result)返回結果(例如返回訂單號)。

  2. 執行reduce方法時,通過context擷取所有子任務的狀態(context.getTaskStatuses())和結果(context.getTaskResults()),並進行相應的邏輯處理。

public void kill(JobContext context);

前端kill任務會觸發該方法,需自行實現如何中斷業務。

public boolean runReduceIfFail(JobContext context)

當存在子任務失敗情況下,是否執行reduce方法。預設配置為:子任務失敗時,仍然執行reduce方法。

操作步驟

  1. 登入MSE SchedulerX控制台,在左側導覽列,單擊任務管理

  2. 任務管理頁面,單擊建立任務

  3. 建立任務面板,執行模式下拉式清單選擇MapReduce,在進階配置地區配置相關資訊。

    配置項

    說明

    分發策略

    說明

    需用戶端版本>=1.10.3。

    • 輪詢策略(預設):每個Worker平均分配等量子任務,適用於每個子任務處理耗時基本一致的情境。

    • 負載最優策略:由主節點自動感知Worker節點的負載情況,適用於子任務和Worker機器處理耗時有較大差異的情境。

    子任務單機並發數

    即單機執行線程數,預設為5。如需加快執行速度,可以調大該值。如果下遊或者資料庫無法承接,可適當調小。

    子任務失敗重試次數

    子任務失敗會自動重試,預設為0。

    子任務失敗稍候再試

    子任務失敗稍候再試,單位:秒,預設為0。

    子任務failover策略

    說明

    需用戶端版本>=1.8.12。

    當執行節點宕機下線後,是否將子任務重新分發給其他機器執行。開啟該配置後,發生failover時,子任務可能會重複執行,需自行做好等冪。

    主節點參與執行

    說明

    需用戶端版本>=1.8.12。

    主節點是否參與子任務執行。線上可運行Worker數量必須不低於2台,在子任務數量特別大時,推薦關閉該參數。

    子任務分發方式

    • 推模型:每台機器平均分配子任務。

    • 拉模型:每台機器主動拉取子任務,沒有木桶效應,支援動態擴容拉取子任務。拉取過程中,所有子任務會緩衝在Master節點,對記憶體有壓力,建議子任務數不超過10,000。

    其他配置項,請參見任務管理進階配置參數說明

原理&最佳實務

Schedulerx2.0分散式運算原理&最佳實務

Demo

處理單表資料(單表ID連續)

  1. 主任務讀取最小ID和最大ID。

    select min(id), max(id) from Tab1;
  2. 根據ID的range進行分頁,每個task包含兩個欄位:startId和endId。

  3. 每個task通過ID的range擷取資料。

    select * from Tab1 where id >= startId and id < endId;

以下為範例程式碼:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多個job後端代碼可以一致,通過控制台配置job參數表示表名
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

處理單表資料(單表ID不連續)

  1. 資料庫採用分桶策略,增加一個bucket欄位作為索引。

  2. 例如1024個桶,資料庫每添加一行記錄時,將訂單號或者ID進行hash,例如訂單號%1024,落在bucket欄位中。

  3. 基本上每個桶是平均的,針對每個桶,都可以通過以下SQL語句全量查詢結果。

    select * from Tab1 where bucket=xxx;

以下為範例程式碼:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多個job後端代碼可以一致,通過控制台配置job參數表示表名。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

處理分庫分表資料

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            //先分庫
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            //根據分庫去分表
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            //如果一個分表也很大,再分頁
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO 返回分庫列表
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO 返回分表列表
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

處理50條訊息,reduce返回結果

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

處理50條訊息並且返回子任務結果由reduce匯總

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}