All Products
Search
Document Center

SchedulerX:Job Golang

Last Updated:Jul 02, 2025

Anda dapat menggunakan SDK untuk Golang untuk menghubungkan aplikasi Golang ke SchedulerX. Dengan cara ini, Anda dapat menjadwalkan antarmuka Prosesor secara berkala. Job tidak dapat dihentikan karena Golang menjalankan job menggunakan goroutine.

Jenis Job

Job mandiri

Tulis kode layanan untuk mengimplementasikan antarmuka Processor.

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

Contoh kode:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"time"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Mulai memproses tugas saya: Hello world!")
	// mock execute task
	time.Sleep(3 * time.Second)
	ret := new(processor.ProcessResult)
	ret.SetStatus(processor.InstanceStatusSucceed)
	fmt.Println("[Process] Selesai memproses tugas saya: Hello world!")
	return ret, nil
}

Job siaran

Job siaran sharding dalam bahasa Java didukung. Antarmuka yang didukung:

  • PreProcess: Node master menjalankan PreProcess sekali sebelum semua node pekerja menjalankan Process.

  • Process: Hasil hanya dikembalikan setelah semua node pekerja menjalankan Process.

  • PostProcess: Setelah semua node pekerja menjalankan Process, node master menjalankan PostProcess sekali untuk mendapatkan hasil dari Process di semua node.

Catatan

Versi SDK SchedulerX untuk Golang harus 0.0.2 atau lebih baru.

Contoh kode:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
	"math/rand"
	"strconv"
)

type TestBroadcast struct{}

// Process semua mesin akan mengeksekusinya.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	value := rand.Intn(10)
	fmt.Printf("Total jumlah sharding=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(value))
	return ret, nil
}

// PreProcess hanya satu mesin yang akan mengeksekusinya.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
	fmt.Println("TestBroadcastJob PreProcess")
	return nil
}

// PostProcess hanya satu mesin yang akan mengeksekusinya.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("TestBroadcastJob PostProcess")
	allTaskResults := ctx.TaskResults()
	allTaskStatuses := ctx.TaskStatuses()
	num := 0

	for key, val := range allTaskResults {
		fmt.Printf("%v:%v\n", key, val)
		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
			valInt, _ := strconv.Atoi(val)
			num += valInt
		}
	}

	fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(num))
	return ret, nil
}

Job MapReduce

Job Map dan MapReduce dalam bahasa Java didukung.

Catatan

Versi SDK SchedulerX untuk Golang harus 0.0.4 atau lebih baru.

Job Map

  1. Tulis kode layanan untuk mengimplementasikan antarmuka MapJobProcessor.

    package main
    
    import (
    	"encoding/json"
    	"errors"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"strconv"
    	"time"
    )
    
    type TestMapJob struct {
    	*mapjob.MapJobProcessor
    }
    
    func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process model MapReduce digunakan untuk pemindaian terdistribusi pesanan untuk konfirmasi timeout
    func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 10
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("mulai tugas root")
    		var messageList []interface{}
    		for i := 1; i <= num; i++ {
    			var str = fmt.Sprintf("id_%d", i)
    			messageList = append(messageList, str)
    		}
    		fmt.Println(messageList)
    		return mr.Map(jobCtx, messageList, "Level1Dispatch")
    	} else if taskName == "Level1Dispatch" {
    		var task []byte = jobCtx.Task()
    		var str string
    		err = json.Unmarshal(task, &str)
    		fmt.Printf("str=%s\n", str)
    		time.Sleep(100 * time.Millisecond)
    		fmt.Println("Selesai Memproses...")
    		if str == "id_5" {
    			return processor.NewProcessResult(
    				processor.WithFailed(),
    				processor.WithResult(str),
    			), errors.New("test")
    		}
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(str),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
  2. Daftarkan job Map pada klien.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// Ini hanya contoh, konfigurasi nyata perlu diperoleh dari platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    	task := &TestMapJob{
    		mapjob.NewMapJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapJob", task)
    	select {}
    }
    

Job MapReduce

  1. Tulis kode layanan untuk mengimplementasikan antarmuka MapReduceJobProcessor.

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    	"strconv"
    	"time"
    )
    
    type OrderInfo struct {
    	Id    string `json:"id"`
    	Value int    `json:"value"`
    }
    
    func NewOrderInfo(id string, value int) *OrderInfo {
    	return &OrderInfo{Id: id, Value: value}
    }
    
    type TestMapReduceJob struct {
    	*mapjob.MapReduceJobProcessor
    }
    
    func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process model MapReduce digunakan untuk pemindaian terdistribusi pesanan untuk konfirmasi timeout
    func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 1000
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("mulai tugas root, taskId=%d", jobCtx.TaskId())
    		var orderInfos []interface{}
    		for i := 1; i <= num; i++ {
    			orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
    		}
    		return mr.Map(jobCtx, orderInfos, "OrderInfo")
    	} else if taskName == "OrderInfo" {
    		orderInfo := new(OrderInfo)
    		if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
    			fmt.Printf("tugas bukan OrderInfo, task=%+v\n", jobCtx.Task())
    		}
    		fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo)
    		time.Sleep(1 * time.Millisecond)
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(strconv.Itoa(orderInfo.Value)),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
    func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	allTaskResults := jobCtx.TaskResults()
    	allTaskStatuses := jobCtx.TaskStatuses()
    	count := 0
    	fmt.Printf("reduce: total tugas=%d\n", len(allTaskResults))
    	for key, val := range allTaskResults {
    		if key == 0 {
    			continue
    		}
    		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
    			num, err := strconv.Atoi(val)
    			if err != nil {
    				return nil, err
    			}
    			count += num
    		}
    	}
    	fmt.Printf("reduce: jumlah tugas sukses=%d\n", count)
    	return processor.NewProcessResult(
    		processor.WithSucceed(),
    		processor.WithResult(strconv.Itoa(count)),
    	), nil
    }
  1. Daftarkan job MapReduce pada klien.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// Ini hanya contoh, konfigurasi nyata perlu diperoleh dari platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    
    	task := &TestMapReduceJob{
    		mapjob.NewMapReduceJobProcessor(),
    	}
        
    	client.RegisterTask("TestMapReduceJob", task)
    	select {}
    }
    

Hentikan Job

Prosesor Golang menjalankan job menggunakan goroutine. Goroutine tidak dapat dihentikan. Namun, Anda dapat menggunakan antarmuka KillProcessor untuk membunuh prosesor.

Catatan

Versi SDK untuk Golang harus 1.0.2 atau lebih baru.

Contoh kode:

package main

import (
	"fmt"
	"time"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

var Stop = false

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Mulai memproses tugas saya: Hello world!")
	// mock execute task
	for i := 0; i < 10; i++ {
		fmt.Printf("Hello%d\n", i)
		time.Sleep(2 * time.Second)
		if Stop {
			break
		}
	}
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	fmt.Println("[Process] Selesai memproses tugas saya: Hello world!")
	return ret, nil
}

func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
	fmt.Println("[Kill] Mulai membunuh tugas saya: Hello world!")
	Stop = true
	return nil
}

Referensi