すべてのプロダクト
Search
ドキュメントセンター

SchedulerX:Golang ジョブ

最終更新日:Jan 14, 2025

Golang アプリケーションを SchedulerX に接続するために Golang 用 SDK を使用できます。この方法で、Processor インターフェースを定期的にスケジュールできます。 Golang はゴルーチンを使用してジョブを実行するため、ジョブを停止することはできません。

ジョブタイプ

スタンドアロンジョブ

サービスコードを記述して、Processor インターフェースを実装します。

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

サンプルコード:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"time"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] タスクの処理を開始します: Hello world!") // タスクの処理を開始します
	// タスク実行のモック
	time.Sleep(3 * time.Second)
	ret := new(processor.ProcessResult)
	ret.SetStatus(processor.InstanceStatusSucceed)
	fmt.Println("[Process] タスクの処理を終了します: Hello world!") // タスクの処理を終了します
	return ret, nil
}

ブロードキャストジョブ

Java 言語でのシャーディングブロードキャストジョブがサポートされています。サポートされているインターフェース:

  • PreProcess: マスターノードは、すべてのワーカーノードが Process を実行する前に PreProcess を 1 回実行します。

  • Process: すべてのワーカーノードが Process を実行した後にのみ結果が返されます。

  • PostProcess: すべてのワーカーノードが Process を実行した後、マスターノードは PostProcess を 1 回実行して、すべてのノードの Process の結果を取得します。

説明

Golang 用 SchedulerX SDK のバージョンは 0.0.2 以降である必要があります。

サンプルコード:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
	"math/rand"
	"strconv"
)

type TestBroadcast struct{}

// Process すべてのマシンがこれを実行します。
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	value := rand.Intn(10)
	fmt.Printf("シャーディングの総数 = %d、シャーディング = %d、値 = %d\n", ctx.ShardingNum(), ctx.ShardingId(), value) // シャーディングの総数、シャーディング、値
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(value))
	return ret, nil
}

// PreProcess 1 台のマシンのみがこれを実行します。
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
	fmt.Println("TestBroadcastJob PreProcess")
	return nil
}

// PostProcess 1 台のマシンのみがこれを実行します。
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("TestBroadcastJob PostProcess")
	allTaskResults := ctx.TaskResults()
	allTaskStatuses := ctx.TaskStatuses()
	num := 0

	for key, val := range allTaskResults {
		fmt.Printf("%v:%v\n", key, val)
		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
			valInt, _ := strconv.Atoi(val)
			num += valInt
		}
	}

	fmt.Printf("TestBroadcastJob PostProcess()、num = %d\n", num) // num
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(num))
	return ret, nil
}

MapReduce ジョブ

Java 言語での Map ジョブと MapReduce ジョブがサポートされています。

説明

Golang 用 SchedulerX SDK のバージョンは 0.0.4 以降である必要があります。

Map ジョブ

  1. サービスコードを記述して、MapJobProcessor インターフェースを実装します。

    package main
    
    import (
    	"encoding/json"
    	"errors"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"strconv"
    	"time"
    )
    
    type TestMapJob struct {
    	*mapjob.MapJobProcessor
    }
    
    func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO 実装してください
    	panic("実装してください") // 実装してください
    }
    
    // Process MapReduce モデルは、タイムアウト確認のために分散スキャン注文に使用されます。
    func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 10
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("ルートタスクを開始します") // ルートタスクを開始します
    		var messageList []interface{}
    		for i := 1; i <= num; i++ {
    			var str = fmt.Sprintf("id_%d", i)
    			messageList = append(messageList, str)
    		}
    		fmt.Println(messageList)
    		return mr.Map(jobCtx, messageList, "Level1Dispatch")
    	} else if taskName == "Level1Dispatch" {
    		var task []byte = jobCtx.Task()
    		var str string
    		err = json.Unmarshal(task, &str)
    		fmt.Printf("str = %s\n", str) // str
    		time.Sleep(100 * time.Millisecond)
    		fmt.Println("処理を完了します...") // 処理を完了します
    		if str == "id_5" {
    			return processor.NewProcessResult(
    				processor.WithFailed(),
    				processor.WithResult(str),
    			), errors.New("test")
    		}
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(str),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
  2. クライアントに Map ジョブを登録します。

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// これは単なる例であり、実際の構成はプラットフォームから取得する必要があります。
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    	task := &TestMapJob{
    		mapjob.NewMapJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapJob", task)
    	select {}
    }
    

MapReduce ジョブ

  1. サービスコードを記述して、MapReduceJobProcessor インターフェースを実装します。

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    	"strconv"
    	"time"
    )
    
    type OrderInfo struct {
    	Id    string `json:"id"`
    	Value int    `json:"value"`
    }
    
    func NewOrderInfo(id string, value int) *OrderInfo {
    	return &OrderInfo{Id: id, Value: value}
    }
    
    type TestMapReduceJob struct {
    	*mapjob.MapReduceJobProcessor
    }
    
    func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO 実装してください
    	panic("実装してください") // 実装してください
    }
    
    // Process MapReduce モデルは、タイムアウト確認のために分散スキャン注文に使用されます。
    func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 1000
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("ルートタスクを開始します。taskId = %d", jobCtx.TaskId()) // ルートタスクを開始します、taskId
    		var orderInfos []interface{}
    		for i := 1; i <= num; i++ {
    			orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
    		}
    		return mr.Map(jobCtx, orderInfos, "OrderInfo")
    	} else if taskName == "OrderInfo" {
    		orderInfo := new(OrderInfo)
    		if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
    			fmt.Printf("タスクは OrderInfo ではありません。task = %+v\n", jobCtx.Task()) // タスクは OrderInfo ではありません、task
    		}
    		fmt.Printf("taskId = %d、orderInfo = %+v\n", jobCtx.TaskId(), orderInfo) // taskId、orderInfo
    		time.Sleep(1 * time.Millisecond)
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(strconv.Itoa(orderInfo.Value)),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
    func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	allTaskResults := jobCtx.TaskResults()
    	allTaskStatuses := jobCtx.TaskStatuses()
    	count := 0
    	fmt.Printf("reduce: すべてのタスク数 = %d\n", len(allTaskResults)) // すべてのタスク数
    	for key, val := range allTaskResults {
    		if key == 0 {
    			continue
    		}
    		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
    			num, err := strconv.Atoi(val)
    			if err != nil {
    				return nil, err
    			}
    			count += num
    		}
    	}
    	fmt.Printf("reduce: 成功したタスク数 = %d\n", count) // 成功したタスク数
    	return processor.NewProcessResult(
    		processor.WithSucceed(),
    		processor.WithResult(strconv.Itoa(count)),
    	), nil
    }
  1. クライアントに MapReduce ジョブを登録します。

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// これは単なる例であり、実際の構成はプラットフォームから取得する必要があります。
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    
    	task := &TestMapReduceJob{
    		mapjob.NewMapReduceJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapReduceJob", task)
    	select {}
    }
    

ジョブの停止

Golang プロセッサは、ゴルーチンを使用してジョブを実行します。ゴルーチンを停止することはできません。 KillProcessor インターフェースを使用してプロセッサを強制終了できます。

説明

Golang 用 SDK のバージョンは 1.0.2 以降である必要があります。

サンプルコード:

package main

import (
	"fmt"
	"time"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

var Stop = false

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] タスクの処理を開始します: Hello world!") // タスクの処理を開始します
	// タスク実行のモック
	for i := 0; i < 10; i++ {
		fmt.Printf("Hello%d\n", i)
		time.Sleep(2 * time.Second)
		if Stop {
			break
		}
	}
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	fmt.Println("[Process] タスクの処理を終了します: Hello world!") // タスクの処理を終了します
	return ret, nil
}

func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
	fmt.Println("[Kill] タスクの強制終了を開始します: Hello world!") // タスクの強制終了を開始します
	Stop = true
	return nil
}

参照