本文介紹在Asynq v0.26.0以下的版本如何?不斷鏈。
解決方式
跟v0.26.0 不同,低版本在Task中沒有可以支援SpanContext 傳遞的載體,為了實現Trace的傳遞,需要在payload 中手動傳遞Trace上下文。
// Task represents a unit of work to be performed.
type Task struct {
// typename indicates the type of task to be performed.
typename string
// payload holds data needed to perform the task.
payload []byte
// opts holds options for the task.
opts []Option
// w is the ResultWriter for the task.
w *ResultWriter
}如Task的Payload 傳遞的資料結構體如下所示:
type WelcomeEmailPayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
Username string `json:"username"`
}需要給結構體增加一個欄位傳遞上線,如下所示:
type WelcomeEmailPayload struct {
UserID int `json:"user_id"`
Email string `json:"email"`
Username string `json:"username"`
Header map[string]string `json:"header"`
}在建立Task前建立一個Span,並將上下文寫入PayLoad中:
var task asynq.Task
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]tracex.SpanStartOption{}, tracex.WithSpanKind(tracex.SpanKindClient))
//將建立的span的trace資訊寫入body中傳到服務端,這裡只是demo,可以根據代碼情況自行調整
ctx, span := tracer.Start(context.Background(), "Push Task", opts...)
var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
otel.GetTextMapPropagator().Inject(ctx, headerMap)
//set span 上下文到header中
for k, v := range headerMap {
task.Header[k] = v
}
defer span.End()
//... push task to server在擷取到Task的地方回複上下文:
var headerMap propagation.MapCarrier
headerMap = make(map[string]string)
ctxRequest := context.Background()
//get task header
var task asynq.Task
for k, v := range task.Header {
headerMap[k] = v
}
xxCtx := otel.GetTextMapPropagator().Extract(ctxRequest, headerMap)
tracer := otel.GetTracerProvider().Tracer("")
opts := append([]trace.SpanStartOption{}, trace.WithSpanKind(trace.SpanKindServer))
_, span := tracer.Start(xxCtx, "Recv Task", opts...)
defer span
//... other通過上述手動傳遞的方式,實現了Span內容相關的傳遞。
編譯後需要開啟應用探針的OpenTelemetry開關配置,重啟即生效。