本文介绍在Asynq v0.26.0以下的版本如何实现不断链。
解决方式
跟v0.26.0 不同,低版本在Task中没有可以支持SpanContext 传递的载体,为了实现Trace的传递,需要在payload 中手动传递Trace上下文。
// Task represents a unit of work to be performed.
type Task struct {
// typename indicates the type of task to be performed.
typename string
// payload holds data needed to perform the task.
payload []byte
// opts holds options for the task.
opts []Option
// w is the ResultWriter for the task.
w *ResultWriter
}如Task的Payload 传递的数据结构体如下所示:
type WelcomeEmailPayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
Username string `json:"username"`
}需要给结构体增加一个字段传递上线,如下所示:
type WelcomeEmailPayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
Username string `json:"username"`
Header map[string]string `json:"header"`
}在创建Task前创建一个Span,并将上下文写入PayLoad中:
var task asynq.Task
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]tracex.SpanStartOption{}, tracex.WithSpanKind(tracex.SpanKindClient))
//将创建的span的trace信息写入body中传到服务端,这里只是demo,可以根据代码情况自行调整
ctx, span := tracer.Start(context.Background(), "Push Task", opts...)
var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, headerMap)
//set span 上下文到header中
for k, v := range headerMap {
task.Header[k] = v
}
defer span.End()
//... push task to server在获取到Task的地方回复上下文:
var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
ctxRequest := context.Background()
//get task header
var task asynq.Task
for k, v := range task.Header {
headerMap[k] = v
}
xxCtx := otel.GetTextMapPropagator().Extract(ctxRequest, headerMap)
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]trace.SpanStartOption{}, trace.WithSpanKind(trace.SpanKindServer))
_, span := tracer.Start(xxCtx, "Recv Task", opts...)
defer span
//... other通过上述手动传递的方式,实现了Span上下文的传递。
编译后需要打开应用探针的OpenTelemetry开关配置,重启即生效。