If the broadcast model is used, a job instance is distributed across all workers in the group to which the job instance belongs. A job is completed only after all workers complete the job. If a worker fails to complete the job, the job is considered failed.

Scenarios

  • Batch O&M operations
    • Run a script on all workers at a scheduled time.
    • Clear the cache on all workers at a scheduled time.
    • Dynamically enable a service on all workers, use a worker to collect the execution results, and then update databases.
  • Data aggregation
    • Initialize Redis caches or databases during the preProcess process if the JavaProcessor is used.
    • Collect the execution result based on your business logic after a worker calls the process method.
    • Aggregate the execution results from all workers and update caches or databases after the workers call the postProcess method.

Supported types of jobs

The broadcast model supports multiple types of jobs, such as script jobs and Java jobs. If you use Java jobs, the broadcast model supports the following advanced features: the preProcess and postProcess methods.

If you use Java jobs, you must use the JavaProcessor 1.0.7 or later. The following code block shows the interface:

  • Required:public ProcessResult process(JobContext context) throws Exception
  • Optional:public void preProcess(JobContext context)
  • Optional:public ProcessResult postProcess(JobContext context)

The preProcess method is called only once on each worker before the process method is called.

The postProcess method is called only once on each worker after all workers call the process method. The execution results can be ingested to workflows.

Examples

@Component
public class TestBroadcastJob extends JavaProcessor {

    /**
     * Only one worker calls the method.
     */
    @Override
    public void preProcess(JobContext context) {
        System.out.println("TestBroadcastJob.preProcess");
    }

    /**
     * All workers call the method.
     */
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int value = new Random().nextInt(10);
        System.out.println("Total number of shards=" + context.getShardingNum() + ", Shard index=" + context.getShardingId() + ", "
                + "taskId=" + context.getTaskId() + ", value=" + value);
        return new ProcessResult(true, String.valueOf(value));
    }

    /**
     * Only one worker calls the method.
     */
    @Override
    public ProcessResult postProcess(JobContext context) {
        System.out.println("TestBroadcastJob.postProcess");
        Map<Long, String> allTaskResults = context.getTaskResults();
        Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
        int num = 0;
        for (Entry<Long, String> entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                num += Integer.valueOf(entry.getValue());
            }
        }
        System.out.println("TestBroadcastJob.postProcess(), num=" + num);
        return new ProcessResult(true, String.valueOf(num));
    }

}