Anda dapat menggunakan SDK untuk Golang untuk menghubungkan aplikasi Golang ke SchedulerX. Dengan cara ini, Anda dapat menjadwalkan antarmuka Prosesor secara berkala. Job tidak dapat dihentikan karena Golang menjalankan job menggunakan goroutine.
Jenis Job
Job mandiri
Tulis kode layanan untuk mengimplementasikan antarmuka Processor.
type Processor interface {
Process(ctx *processor.JobContext) (*ProcessResult, error)
}Contoh kode:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"time"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Mulai memproses tugas saya: Hello world!")
// mock execute task
time.Sleep(3 * time.Second)
ret := new(processor.ProcessResult)
ret.SetStatus(processor.InstanceStatusSucceed)
fmt.Println("[Process] Selesai memproses tugas saya: Hello world!")
return ret, nil
}
Job siaran
Job siaran sharding dalam bahasa Java didukung. Antarmuka yang didukung:
PreProcess: Node master menjalankan PreProcess sekali sebelum semua node pekerja menjalankan Process.
Process: Hasil hanya dikembalikan setelah semua node pekerja menjalankan Process.
PostProcess: Setelah semua node pekerja menjalankan Process, node master menjalankan PostProcess sekali untuk mendapatkan hasil dari Process di semua node.
Versi SDK SchedulerX untuk Golang harus 0.0.2 atau lebih baru.
Contoh kode:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
"math/rand"
"strconv"
)
type TestBroadcast struct{}
// Process semua mesin akan mengeksekusinya.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
value := rand.Intn(10)
fmt.Printf("Total jumlah sharding=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(value))
return ret, nil
}
// PreProcess hanya satu mesin yang akan mengeksekusinya.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
fmt.Println("TestBroadcastJob PreProcess")
return nil
}
// PostProcess hanya satu mesin yang akan mengeksekusinya.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("TestBroadcastJob PostProcess")
allTaskResults := ctx.TaskResults()
allTaskStatuses := ctx.TaskStatuses()
num := 0
for key, val := range allTaskResults {
fmt.Printf("%v:%v\n", key, val)
if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
valInt, _ := strconv.Atoi(val)
num += valInt
}
}
fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(num))
return ret, nil
}
Job MapReduce
Job Map dan MapReduce dalam bahasa Java didukung.
Versi SDK SchedulerX untuk Golang harus 0.0.4 atau lebih baru.
Job Map
Tulis kode layanan untuk mengimplementasikan antarmuka
MapJobProcessor.package main import ( "encoding/json" "errors" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "strconv" "time" ) type TestMapJob struct { *mapjob.MapJobProcessor } func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process model MapReduce digunakan untuk pemindaian terdistribusi pesanan untuk konfirmasi timeout func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 10 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("mulai tugas root") var messageList []interface{} for i := 1; i <= num; i++ { var str = fmt.Sprintf("id_%d", i) messageList = append(messageList, str) } fmt.Println(messageList) return mr.Map(jobCtx, messageList, "Level1Dispatch") } else if taskName == "Level1Dispatch" { var task []byte = jobCtx.Task() var str string err = json.Unmarshal(task, &str) fmt.Printf("str=%s\n", str) time.Sleep(100 * time.Millisecond) fmt.Println("Selesai Memproses...") if str == "id_5" { return processor.NewProcessResult( processor.WithFailed(), processor.WithResult(str), ), errors.New("test") } return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(str), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil }Daftarkan job Map pada klien.
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // Ini hanya contoh, konfigurasi nyata perlu diperoleh dari platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapJob{ mapjob.NewMapJobProcessor(), } client.RegisterTask("TestMapJob", task) select {} }
Job MapReduce
Tulis kode layanan untuk mengimplementasikan antarmuka
MapReduceJobProcessor.package main import ( "encoding/json" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" "strconv" "time" ) type OrderInfo struct { Id string `json:"id"` Value int `json:"value"` } func NewOrderInfo(id string, value int) *OrderInfo { return &OrderInfo{Id: id, Value: value} } type TestMapReduceJob struct { *mapjob.MapReduceJobProcessor } func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process model MapReduce digunakan untuk pemindaian terdistribusi pesanan untuk konfirmasi timeout func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 1000 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("mulai tugas root, taskId=%d", jobCtx.TaskId()) var orderInfos []interface{} for i := 1; i <= num; i++ { orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i)) } return mr.Map(jobCtx, orderInfos, "OrderInfo") } else if taskName == "OrderInfo" { orderInfo := new(OrderInfo) if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil { fmt.Printf("tugas bukan OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo) time.Sleep(1 * time.Millisecond) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil } func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { allTaskResults := jobCtx.TaskResults() allTaskStatuses := jobCtx.TaskStatuses() count := 0 fmt.Printf("reduce: total tugas=%d\n", len(allTaskResults)) for key, val := range allTaskResults { if key == 0 { continue } if allTaskStatuses[key] == taskstatus.TaskStatusSucceed { num, err := strconv.Atoi(val) if err != nil { return nil, err } count += num } } fmt.Printf("reduce: jumlah tugas sukses=%d\n", count) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(count)), ), nil }
Daftarkan job MapReduce pada klien.
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // Ini hanya contoh, konfigurasi nyata perlu diperoleh dari platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapReduceJob{ mapjob.NewMapReduceJobProcessor(), } client.RegisterTask("TestMapReduceJob", task) select {} }
Hentikan Job
Prosesor Golang menjalankan job menggunakan goroutine. Goroutine tidak dapat dihentikan. Namun, Anda dapat menggunakan antarmuka KillProcessor untuk membunuh prosesor.
Versi SDK untuk Golang harus 1.0.2 atau lebih baru.
Contoh kode:
package main
import (
"fmt"
"time"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
var Stop = false
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Mulai memproses tugas saya: Hello world!")
// mock execute task
for i := 0; i < 10; i++ {
fmt.Printf("Hello%d\n", i)
time.Sleep(2 * time.Second)
if Stop {
break
}
}
ret := new(processor.ProcessResult)
ret.SetSucceed()
fmt.Println("[Process] Selesai memproses tugas saya: Hello world!")
return ret, nil
}
func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Mulai membunuh tugas saya: Hello world!")
Stop = true
return nil
}