All Products
Search
Document Center

SchedulerX:Golang jobs

Last Updated:Mar 08, 2024

You can use SchedulerX SDK for Golang to connect Golang applications to SchedulerX. This way, you can schedule the Processor interface on a regular basis. You cannot stop jobs because Golang runs jobs by using goroutines.

Standalone jobs

Write business code to implement Processor.

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

Example:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"time"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Start process my task: Hello world!")
	// mock execute task
	time.Sleep(3 * time.Second)
	ret := new(processor.ProcessResult)
	ret.SetStatus(processor.InstanceStatusSucceed)
	fmt.Println("[Process] End process my task: Hello world!")
	return ret, nil
}

Broadcast jobs

Sharding broadcast jobs in the Java language are supported. Specific interfaces:

  • PreProcess: The master node runs PreProcess once before all the worker nodes run Process.

  • Process: Results are returned only after all worker nodes run Process.

  • PostProcess: After all worker nodes run Process, the master node runs PostProcess once to obtain the results of Process on all nodes.

Note

The version of SchedulerX SDK for Golang must be 0.02 or later.

Example:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
	"math/rand"
	"strconv"
)

type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	value := rand.Intn(10)
	fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(value))
	return ret, nil
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
	fmt.Println("TestBroadcastJob PreProcess")
	return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("TestBroadcastJob PostProcess")
	allTaskResults := ctx.TaskResults()
	allTaskStatuses := ctx.TaskStatuses()
	num := 0

	for key, val := range allTaskResults {
		fmt.Printf("%v:%v\n", key, val)
		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
			valInt, _ := strconv.Atoi(val)
			num += valInt
		}
	}

	fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(num))
	return ret, nil
}

MapReduce jobs

Map and MapReduce jobs in the Java language are supported.

Note

The version of SchedulerX SDK for Golang must be 0.0.4 or later.

Map jobs

  1. Write business code to implement MapJobProcessor.

    package main
    
    import (
    	"encoding/json"
    	"errors"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"strconv"
    	"time"
    )
    
    type TestMapJob struct {
    	*mapjob.MapJobProcessor
    }
    
    func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 10
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task")
    		var messageList []interface{}
    		for i := 1; i <= num; i++ {
    			var str = fmt.Sprintf("id_%d", i)
    			messageList = append(messageList, str)
    		}
    		fmt.Println(messageList)
    		return mr.Map(jobCtx, messageList, "Level1Dispatch")
    	} else if taskName == "Level1Dispatch" {
    		var task []byte = jobCtx.Task()
    		var str string
    		err = json.Unmarshal(task, &str)
    		fmt.Printf("str=%s\n", str)
    		time.Sleep(100 * time.Millisecond)
    		fmt.Println("Finish Process...")
    		if str == "id_5" {
    			return processor.NewProcessResult(
    				processor.WithFailed(),
    				processor.WithResult(str),
    			), errors.New("test")
    		}
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(str),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
  2. Register a Map job on the client.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    	task := &TestMapJob{
    		mapjob.NewMapJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapJob", task)
    	select {}
    }
    

MapReduce jobs

  1. Write business code to implement MapReduceJobProcessor.

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    	"strconv"
    	"time"
    )
    
    type OrderInfo struct {
    	Id    string `json:"id"`
    	Value int    `json:"value"`
    }
    
    func NewOrderInfo(id string, value int) *OrderInfo {
    	return &OrderInfo{Id: id, Value: value}
    }
    
    type TestMapReduceJob struct {
    	*mapjob.MapReduceJobProcessor
    }
    
    func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 1000
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task, taskId=%d", jobCtx.TaskId())
    		var orderInfos []interface{}
    		for i := 1; i <= num; i++ {
    			orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
    		}
    		return mr.Map(jobCtx, orderInfos, "OrderInfo")
    	} else if taskName == "OrderInfo" {
    		orderInfo := new(OrderInfo)
    		if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
    			fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task())
    		}
    		fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo)
    		time.Sleep(1 * time.Millisecond)
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(strconv.Itoa(orderInfo.Value)),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
    func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	allTaskResults := jobCtx.TaskResults()
    	allTaskStatuses := jobCtx.TaskStatuses()
    	count := 0
    	fmt.Printf("reduce: all task count=%d\n", len(allTaskResults))
    	for key, val := range allTaskResults {
    		if key == 0 {
    			continue
    		}
    		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
    			num, err := strconv.Atoi(val)
    			if err != nil {
    				return nil, err
    			}
    			count += num
    		}
    	}
    	fmt.Printf("reduce: succeed task count=%d\n", count)
    	return processor.NewProcessResult(
    		processor.WithSucceed(),
    		processor.WithResult(strconv.Itoa(count)),
    	), nil
    }
  1. Register a MapReduce job on the client.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    
    	task := &TestMapReduceJob{
    		mapjob.NewMapReduceJobProcessor(),
    	}
        
    	client.RegisterTask("TestMapReduceJob", task)
    	select {}
    }
    

References