全部產品
Search
文件中心

SchedulerX:廣播

更新時間:Jul 06, 2024

廣播執行表示一個任務執行個體會廣播到該分組所有Worker上執行,當所有Worker都執行完成,該任務才算完成。任意一台Worker執行失敗,都算該任務失敗。

應用情境

  • 批量營運
    • 定時廣播所有機器運行某個指令碼。
    • 定時廣播所有機器清理緩衝。
    • 動態拉起每台機器的某個服務,最後由一台機器回收結果修改資料庫。
  • 資料彙總
    • 使用JavaProcessor,preProcess的時候初始化Redis緩衝或者資料庫。
    • 每台機器執行process的時候,根據自己業務返回result。
    • postProcess的時候,擷取所有機器的執行結果做匯總,更新緩衝或者資料庫。

任務類型

任務類型可以選擇多種,例如指令碼或者Java任務。如果選擇Java,還支援preProcess和postProcess進階特性。

使用Java任務需要繼承JavaProcessor(1.0.7及以上版本),介面如下:

  • 必需:public ProcessResult process(JobContext context) throws Exception
  • 可選:public void preProcess(JobContext context)
  • 可選:public ProcessResult postProcess(JobContext context)

preProcess會在所有機器執行process之前執行,且只會執行一次。

postProcess會在所有機器執行process且都成功執行之後執行一次,可以返回結果,作為工作流程資料轉送。

Demo樣本

@Component
public class TestBroadcastJob extends JavaProcessor {

    /**
     * 只有一台機器會執行
     */
    @Override
    public void preProcess(JobContext context) {
        System.out.println("TestBroadcastJob.preProcess");
    }

    /**
     * 所有機器會執行
     */
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int value = new Random().nextInt(10);
        System.out.println("分區總數=" + context.getShardingNum() + ", 分區號=" + context.getShardingId() + ", "
                + "taskId=" + context.getTaskId() + ", value=" + value);
        return new ProcessResult(true, String.valueOf(value));
    }

    /**
     * 只有一台機器會執行
     */
    @Override
    public ProcessResult postProcess(JobContext context) {
        System.out.println("TestBroadcastJob.postProcess");
        Map<Long, String> allTaskResults = context.getTaskResults();
        Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
        int num = 0;
        for (Entry<Long, String> entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                num += Integer.valueOf(entry.getValue());
            }
        }
        System.out.println("TestBroadcastJob.postProcess(), num=" + num);
        return new ProcessResult(true, String.valueOf(num));
    }

}