You can run Java jobs in application processes. This topic describes how to manage Java jobs.
Execution modes
Java jobs support the following execution modes:
Standalone operation: The job is executed on a random worker from the
groupId
group.Broadcast run: The job is concurrently executed on all workers that belong to the
groupId
group and the system waits until all workers complete the job.Visual MapReduce: a MapReduce model. You must activate Professional Edition. The maximum number of tasks is limited to 1,000. You can query detailed execution records, operational logs and stacks of tasks by keyword.
MapReduce: a regular MapReduce model. In this mode, you can process a large number of tasks in a parallel manner. But you can query only the running information of tasks. We recommend that you select this mode when the number of tasks is less than 1,000,000.
Shard run: This mode uses static shards and dynamic shards to handle big data computing business.
If you select Stand-alone operation and Broadcast run modes, you need to implement the JavaProcessor class. If you select Visual MapReduce, MapReduce, and Shard run modes, you need to implement the MapJobProcessor class.
The paths of Processor classes are the full paths of implementation classes. Example: com.apache.armon.test.schedulerx.processor.MySimpleJob
.
If you do not upload a JAR package, SchedulerX searches for Processor implementation classes in the classpath of your application process. Therefore, you must recompile and republish an application each time you modify the job for the application.
If you upload a JAR package, SchedulerX dynamically loads the JAR package and Processor. In this case, you do not need to republish the application after you modify the job for the application.
Programming models
Java jobs support the following programming models: JavaProcessor and MapJobProcessor.
JavaProcessor
Optional:
public void preProcess(JobContext context) throws Exception
Required:
public ProcessResult process(JobContext context) throws Exception
Optional:
public void postProcess(JobContext context)
Optional:
public void kill(JobContext context)
MapJobProcessor
Required:
public ProcessResult process(JobContext context) throws Exception
Optional:
public void postProcess(JobContext context)
Optional:
public void kill(JobContext context)
Required:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
All processes must return a ProcessResult. The ProcessResult contains the execution status and result of the job and the error message.
If
return new ProcessResult(true)
is displayed, it indicates that a job is completed.If
return new ProcessResult(false, ErrorMsg)
is displayed or an exception is thrown, it indicates that a job failed.A job is completed and the execution result:
return new ProcessResult(true, result)
is returned.result
is a string and cannot be larger than 1,000 bytes.
Sample job (Hello, SchedulerX 2.0)
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
}
Sample job that can be terminated
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //If the job is launched by using Spring and the bean is a Singleton bean, you must use preProcess to reset the flag.
}
}
Sample job that uses the Map model for batch processing
/**
* Perform distributed batch processing on individual tables.
* 1. The root job is used to query a table and obtain minId and maxId.
* 2. Build PageTask and call the map method to distribute tasks.
* 3. Process the data if PageTask is obtained at the next level.
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //Calculate the number of pages.
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); //The process calls the map method to distribute tasks.
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}