全部产品
Search
文档中心

应用实时监控服务ARMS:Asynq v0.26.0以下的版本如何实现不断链

更新时间:Mar 12, 2026

本文介绍在Asynq v0.26.0以下的版本如何实现不断链。

解决方式

跟v0.26.0 不同,低版本在Task中没有可以支持SpanContext 传递的载体,为了实现Trace的传递,需要在payload 中手动传递Trace上下文。

// Task represents a unit of work to be performed.
type Task struct {
    // typename indicates the type of task to be performed.
    typename string

    // payload holds data needed to perform the task.
    payload []byte

    // opts holds options for the task.
    opts []Option

    // w is the ResultWriter for the task.
    w *ResultWriter
}

如Task的Payload 传递的数据结构体如下所示:

type WelcomeEmailPayload struct {
	UserID   int    `json:"user_id"`
	Email    string `json:"email"`
	Username string `json:"username"`
}

需要给结构体增加一个字段传递上线,如下所示:

type WelcomeEmailPayload struct {
	UserID   int    `json:"user_id"`
	Email    string `json:"email"`
	Username string `json:"username"`
    Header   map[string]string `json:"header"`
}

在创建Task前创建一个Span,并将上下文写入PayLoad中:

var task asynq.Task
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]tracex.SpanStartOption{}, tracex.WithSpanKind(tracex.SpanKindClient))
//将创建的span的trace信息写入body中传到服务端,这里只是demo,可以根据代码情况自行调整
ctx, span := tracer.Start(context.Background(), "Push Task", opts...)
var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, headerMap)
//set span 上下文到header中
for k, v := range headerMap {
  task.Header[k] = v
}
defer span.End()
//... push task to server

在获取到Task的地方回复上下文:

var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
ctxRequest := context.Background()
//get task header
var task asynq.Task
for k, v := range task.Header {
  headerMap[k] = v
}
xxCtx := otel.GetTextMapPropagator().Extract(ctxRequest, headerMap)
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]trace.SpanStartOption{}, trace.WithSpanKind(trace.SpanKindServer))
_, span := tracer.Start(xxCtx, "Recv Task", opts...)
defer span
//... other

通过上述手动传递的方式,实现了Span上下文的传递。

编译后需要打开应用探针的OpenTelemetry开关配置,重启即生效。