Java調度任務可以在您的應用進程中執行。本文介紹如何管理Java類型的任務。
執行模式
Java任務類型支援單機、廣播、可視化MapReduce、MapReduce和分區運行模式:
單機:在同一個
groupId下的機器隨機挑一台執行。廣播:同一個
groupId下的所有機器同時執行。可視化MapReduce:屬於MapReduce模型任務,需開通專業版。可支援至1000以內子任務量,並且支援基於業務關鍵字的可視化子任務執行詳細記錄查詢,支援基於業務關鍵字的子任務作業記錄或堆棧查詢。
MapReduce:常規MapReduce模型任務,支援超大數量的子任務平行處理,僅可查詢子任務運行匯總資訊,建議子任務在100萬以下時選擇。
分區運行:包括靜態分區和動態分批,用於處理巨量資料業務需求。
單機和廣播需要實現JavaProcessor;可視化MapReduce、MapReduce和分區運行需要實現MapJobProcessor。
Processor類路徑,即實作類別的全路徑名,例如com.apache.armon.test.schedulerx.processor.MySimpleJob:
如果不上傳JAR包,SchedulerX會去您的應用進程中的classpath下尋找processor實作類別,所以每次修改需要重新編譯和發布。
如果上傳了JAR包,每次會熱載入JAR包和processor,不需要重新發布應用。
編程模型
Java任務支援兩種編程模型:JavaProcessor和MapJobProcessor。
JavaProcessor
可選:
public void preProcess(JobContext context) throws Exception必需:
public ProcessResult process(JobContext context) throws Exception可選:
public void postProcess(JobContext context)可選:
public void kill(JobContext context)
MapJobProcessor
必需:
public ProcessResult process(JobContext context) throws Exception可選:
public void postProcess(JobContext context)可選:
public void kill(JobContext context)必需:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
每個process需要返回ProcessResult,用來表示任務執行的狀態、結果和錯誤資訊。
任務運行成功:
return new ProcessResult(true)。任務運行失敗:
return new ProcessResult(false, ErrorMsg)或者直接拋異常。任務運行成功並且返回結果:
return new ProcessResult(true, result)。result是一個字串,不能大於1000位元組。
HelloSchedulerx2.0任務樣本
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
} 支援Kill功能的任務樣本
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //如果是通過Spring啟動,Bean是單例,需要通過preProcess把標記為複位
}
} 通過Map模型批量處理任務樣本
/**
* 對一張單表進行分布式批量處理
* 1. 根任務先查詢一張表,擷取minId,maxId
* 2. 構造PageTask,通過map進行分發
* 3. 下一級擷取到如果是PageTask,則進行資料處理
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //計算分頁數量
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); //process調用map方法完成子任務分發
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}