Anda dapat menjalankan pekerjaan Java dalam proses aplikasi. Topik ini menjelaskan cara mengelola pekerjaan Java.
Mode Eksekusi
Pekerjaan Java mendukung mode eksekusi berikut:
Operasi Mandiri: Pekerjaan dijalankan pada pekerja acak dari grup
groupId.Jalankan Siaran: Pekerjaan dijalankan secara bersamaan pada semua pekerja yang termasuk dalam grup
groupId, dan sistem menunggu hingga semua pekerja menyelesaikan pekerjaan.Visual MapReduce: Model MapReduce. Anda harus mengaktifkan Professional Edition. Jumlah maksimum tugas dibatasi hingga 1.000. Anda dapat memeriksa catatan eksekusi terperinci, log operasional, dan tumpukan tugas berdasarkan kata kunci.
MapReduce: Model MapReduce biasa. Dalam mode ini, Anda dapat memproses sejumlah besar tugas secara paralel. Namun, Anda hanya dapat memeriksa informasi berjalan dari tugas-tugas tersebut. Kami merekomendasikan Anda memilih mode ini ketika jumlah tugas kurang dari 1.000.000.
Jalankan Shard: Mode ini menggunakan shard statis dan shard dinamis untuk menangani bisnis komputasi data besar.
Jika Anda memilih mode Operasi Mandiri atau Jalankan Siaran, Anda perlu mengimplementasikan kelas JavaProcessor. Jika Anda memilih mode Visual MapReduce, MapReduce, atau Jalankan Shard, Anda perlu mengimplementasikan kelas MapJobProcessor.
Path kelas Processor adalah path lengkap dari kelas implementasi. Contoh: com.apache.armon.test.schedulerx.processor.MySimpleJob.
Jika Anda tidak mengunggah paket JAR, SchedulerX akan mencari kelas implementasi Processor di classpath proses aplikasi Anda. Oleh karena itu, Anda harus mengompilasi ulang dan menerbitkan ulang aplikasi setiap kali Anda memodifikasi pekerjaan untuk aplikasi tersebut.
Jika Anda mengunggah paket JAR, SchedulerX secara dinamis memuat paket JAR dan Processor. Dalam hal ini, Anda tidak perlu menerbitkan ulang aplikasi setelah Anda memodifikasi pekerjaan untuk aplikasi tersebut.
Model Pemrograman
Pekerjaan Java mendukung model pemrograman berikut: JavaProcessor dan MapJobProcessor.
JavaProcessor
Opsional:
public void preProcess(JobContext context) throws ExceptionWajib:
public ProcessResult process(JobContext context) throws ExceptionOpsional:
public void postProcess(JobContext context)Opsional:
public void kill(JobContext context)
MapJobProcessor
Wajib:
public ProcessResult process(JobContext context) throws ExceptionOpsional:
public void postProcess(JobContext context)Opsional:
public void kill(JobContext context)Wajib:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
Semua proses harus mengembalikan ProcessResult. ProcessResult berisi status eksekusi, hasil pekerjaan, serta pesan kesalahan.
Jika ditampilkan
return new ProcessResult(true), itu menunjukkan bahwa pekerjaan telah selesai.Jika ditampilkan
return new ProcessResult(false, ErrorMsg)atau pengecualian dilempar, itu menunjukkan bahwa pekerjaan gagal.Pekerjaan selesai dan hasil eksekusi:
return new ProcessResult(true, result)dikembalikan.resultadalah string dan tidak boleh lebih besar dari 1.000 byte.
Pekerjaan Contoh (Halo, SchedulerX 2.0)
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Halo, schedulerx2.0!");
return new ProcessResult(true);
}
} Pekerjaan contoh yang dapat dihentikan
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //Jika pekerjaan diluncurkan menggunakan Spring dan bean adalah Singleton bean, Anda harus menggunakan preProcess untuk mereset flag.
}
} Pekerjaan contoh yang menggunakan model Map untuk pemrosesan batch
/**
* Lakukan pemrosesan batch terdistribusi pada tabel individu.
* 1. Pekerjaan root digunakan untuk meminta tabel dan mendapatkan minId dan maxId.
* 2. Bangun PageTask dan panggil metode map untuk mendistribusikan tugas.
* 3. Proses data jika PageTask diperoleh di level berikutnya.
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("mulai tugas root");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //Hitung jumlah halaman.
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); //Proses memanggil metode map untuk mendistribusikan tugas.
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("semua tugas selesai.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}