构建从UnoCSS前端到Go后端经由Pulsar的全链路可观测性上下文传播


技术痛点:断裂的追踪链

在一个典型的微服务项目中,我们遇到了一个棘手的可观测性问题。系统前端使用 UnoCSS 构建,交互简洁高效;后端是清一色的 Go 微服务,它们之间的同步通信由服务网格(Istio)接管,而异步通信则依赖 Apache Pulsar。问题在于,当一个用户在前台点击一个由 UnoCSS 驱动的按钮(例如 class="p-2 m-1 bg-blue-500 text-white rounded") 后,整个业务流程横跨了 HTTP 请求和 Pulsar 消息两个领域。

服务网格为我们漂亮地解决了 HTTP 服务间的调用追踪,但在 Jaeger UI 上,我们看到的链路是断开的:Trace A 从 Ingress Gateway 到达第一个 Go 服务(生产者),然后就中断了。几秒钟后,另一个完全独立的 Trace B 在消费端 Go 服务中凭空出现。两者之间通过 Pulsar 传递消息的关键环节,成了一个观测黑洞。在排查性能瓶颈或业务故障时,这种断裂的追踪链是致命的,我们无法确定消息在 Pulsar 中停留了多久,也无法将生产者的错误与消费者的异常行为关联起来。

初步构想:手动注入与标准化的权衡

最初的解决方案很直接:在生产者服务的业务逻辑中,手动从请求头中提取 x-request-idtrace-id,将其塞进 Pulsar 消息的 Properties 中;消费者再手动取出并设置到日志上下文中。

这个方案能用,但非常脆弱且不具备扩展性。它强依赖于特定的头信息格式,并且每个需要接入的开发者都必须清楚地知道这个“潜规则”。一旦追踪系统更换(比如从 Jaeger 换到 SkyWalking),或者 W3C Trace Context 成为标准,所有硬编码的逻辑都需要重构。我们需要一个标准化的、与具体实现解耦的方案。这自然地导向了 OpenTelemetry。目标是明确的:实现一个从 HTTP 请求上下文到 Pulsar 消息属性的无缝传播机制,让 OpenTelemetry SDK 自动完成这一切。

技术选型与实现路径

我们的技术栈是固定的:Go、Pulsar、Istio。挑战在于,Pulsar 的官方 Go客户端 apache/pulsar-client-go 并没有像 gRPC 或标准 net/http 库那样,提供开箱即用的 OpenTelemetry 中间件(Instrumentation)。这意味着我们需要自己动手,为 Pulsar 的生产者和消费者实现上下文的注入(Inject)与提取(Extract)。

OpenTelemetry 为此提供了标准接口:go.opentelemetry.io/otel/propagation。核心是实现一个 TextMapCarrier,它能让 SDK 知道如何读写特定传输介质的元数据。对于 Pulsar 而言,这个介质就是消息的 Properties map。

我们的实现将分为三步:

  1. 定义 Pulsar Carrier: 创建一个结构体,实现 propagation.TextMapCarrier 接口,适配 Pulsar 消息的 map[string]string 类型的 Properties
  2. 封装生产者: 创建一个 Pulsar 生产者的装饰器(Decorator)或者一个辅助函数,它在发送消息前,自动从当前 Go context.Context 中提取 Span 信息,并通过我们定义的 Carrier 将其注入到消息属性中。
  3. 封装消费者: 同样地,为消费者创建一个装饰器或中间件。在接收到消息后,它会检查消息属性,通过 Carrier 提取出上游的 Span 上下文,并用其创建一个新的、存在父子关系的子 Span。

步骤化实现:代码是最终的答案

1. 定义 Pulsar 消息的 TextMapCarrier

这是连接 OpenTelemetry SDK 和 Pulsar 消息的桥梁。实现非常直接,我们只需要告诉 SDK 如何在 map[string]string 上进行 GetSet 操作。

package otelcarrier

import (
	"go.opentelemetry.io/otel/propagation"
)

// PulsarMessageCarrier 实现了 go.opentelemetry.io/otel/propagation.TextMapCarrier 接口
// 它使得 OpenTelemetry 的上下文能够被注入和从 Pulsar 消息的属性中提取。
// 这里的关键是让 SDK 知道如何与 map[string]string 类型的元数据进行交互。
type PulsarMessageCarrier struct {
	Properties map[string]string
}

// 确保 PulsarMessageCarrier 实现了 TextMapCarrier 接口
var _ propagation.TextMapCarrier = (*PulsarMessageCarrier)(nil)

// Get 返回与给定键关联的值。
// OpenTelemetry Propagator 会调用这个方法来读取追踪头,例如 'traceparent'。
func (c *PulsarMessageCarrier) Get(key string) string {
	if c.Properties == nil {
		return ""
	}
	return c.Properties[key]
}

// Set 为给定的键设置值。
// OpenTelemetry Propagator 会调用这个方法来写入追踪头。
func (c *PulsarMessageCarrier) Set(key, value string) {
	if c.Properties == nil {
		c.Properties = make(map[string]string)
	}
	c.Properties[key] = value
}

// Keys 返回 carrier 中所有的键。
// 这对于某些 Propagator 可能是必需的。
func (c *PulsarMessageCarrier) Keys() []string {
	if c.Properties == nil {
		return []string{}
	}
	keys := make([]string, 0, len(c.Properties))
	for k := range c.Properties {
		keys = append(keys, k)
	}
	return keys
}

这个小小的工具类是整个方案的核心,它解耦了 OpenTelemetry 的传播逻辑和 Pulsar 的具体实现。

2. 注入上下文的 Pulsar 生产者

现在我们来改造消息生产者。我们不直接修改业务代码中调用 producer.Send() 的地方,而是提供一个封装好的函数 SendWithTrace。这个函数接受业务代码的 context,这至关重要,因为服务网格(或上游服务)注入的追踪信息就存在于这个 context 中。

package main

import (
	"context"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"

	// 引入我们自己定义的 carrier
	"your_project/internal/otelcarrier"
)

// InstrumentedProducer 封装了 pulsar.Producer 以提供自动的追踪注入
type InstrumentedProducer struct {
	producer pulsar.Producer
	tracer   trace.Tracer
}

func NewInstrumentedProducer(client pulsar.Client, topic string) (*InstrumentedProducer, error) {
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: topic,
	})
	if err != nil {
		return nil, err
	}

	// 通常从一个集中的地方获取 tracer provider
	tracer := otel.Tracer("pulsar-producer")

	return &InstrumentedProducer{
		producer: producer,
		tracer:   tracer,
	}, nil
}

// SendWithTrace 是核心的封装方法
// 它创建了一个代表消息生产过程的 Span,并将上下文注入到消息中
func (p *InstrumentedProducer) SendWithTrace(ctx context.Context, msg *pulsar.ProducerMessage) (pulsar.MessageID, error) {
	// 1. 创建一个新的 Span,它会自动成为 ctx 中当前 Span 的子 Span
	// Span 的名称清晰地表明了操作的目的地和类型
	spanName := p.producer.Topic() + " send"
	opts := []trace.SpanStartOption{
		trace.WithSpanKind(trace.SpanKindProducer),
		trace.WithAttributes(
			attribute.String("messaging.system", "pulsar"),
			attribute.String("messaging.destination", p.producer.Topic()),
			attribute.String("messaging.destination_kind", "topic"),
		),
	}

	spanCtx, span := p.tracer.Start(ctx, spanName, opts...)
	defer span.End()

	// 2. 注入上下文到消息属性中
	// 如果业务代码已经设置了 Properties,我们需要合并而不是覆盖
	if msg.Properties == nil {
		msg.Properties = make(map[string]string)
	}
	carrier := &otelcarrier.PulsarMessageCarrier{Properties: msg.Properties}
	
	// 使用全局的 TextMapPropagator (通常是 W3C TraceContext) 进行注入
	propagator := otel.GetTextMapPropagator()
	propagator.Inject(spanCtx, carrier)

	// 3. 实际发送消息
	msgID, err := p.producer.Send(spanCtx, msg)
	if err != nil {
		// 在 Span 中记录错误信息,这在追踪系统中会高亮显示
		span.RecordError(err)
		span.SetStatus(codes.Error, err.Error())
	}

	// 可以在 Span 中添加事件或属性,记录消息ID
	span.SetAttributes(attribute.String("messaging.message_id", msgID.String()))

	return msgID, err
}

func (p *InstrumentedProducer) Close() {
	p.producer.Close()
}

在真实项目中,main 函数或者服务初始化的地方会设置好 OpenTelemetry 的全局 Provider 和 Propagator。业务逻辑现在只需要调用 instrumentedProducer.SendWithTrace(ctx, ...) 即可。

3. 提取上下文的 Pulsar 消费者

消费端的逻辑是生产端的逆操作。我们需要一个消费循环的中间件,它在处理每条消息前,先从消息属性中提取追踪上下文。

package main

import (
	"context"
	"log"

	"github.com/apache/pulsar-client-go/pulsar"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/trace"
	"go.opentelemetry.io/otel/codes"

	"your_project/internal/otelcarrier"
)

// MessageHandler 是业务逻辑处理函数签名
type MessageHandler func(ctx context.Context, msg pulsar.Message) error

// TraceMiddleware 是一个消费者中间件,用于包裹业务处理器
func TraceMiddleware(handler MessageHandler) MessageHandler {
	tracer := otel.Tracer("pulsar-consumer")

	return func(ctx context.Context, msg pulsar.Message) error {
		// 1. 从消息属性中提取上下文
		propagator := otel.GetTextMapPropagator()
		carrier := &otelcarrier.PulsarMessageCarrier{Properties: msg.Properties()}
		
		// Extract 会解析 'traceparent' 等头信息,并返回一个包含远程 Span 上下文的 Go context
		remoteCtx := propagator.Extract(ctx, carrier)

		// 2. 创建一个新的 Span,并建立与生产者 Span 的父子关系
		spanName := msg.Topic() + " receive"
		opts := []trace.SpanStartOption{
			trace.WithSpanKind(trace.SpanKindConsumer),
			trace.WithAttributes(
				attribute.String("messaging.system", "pulsar"),
				attribute.String("messaging.destination", msg.Topic()),
				attribute.String("messaging.destination_kind", "topic"),
				attribute.String("messaging.message_id", msg.ID().String()),
			),
		}
		
		// remoteCtx 中包含了从消息中提取的父 Span 信息,
		// tracer.Start 会自动将其链接起来。
		spanCtx, span := tracer.Start(remoteCtx, spanName, opts...)
		defer span.End()

		// 3. 使用包含新 Span 的上下文调用真正的业务处理器
		err := handler(spanCtx, msg)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
		}

		return err
	}
}

// 示例业务处理器
func myBusinessLogic(ctx context.Context, msg pulsar.Message) error {
	log.Printf("Received message: %s", string(msg.Payload()))
	// 这里的 ctx 已经包含了正确的追踪信息
	// 在这个函数内部进行的任何数据库调用、HTTP 请求等,如果也接入了 OTEL,
	// 它们的 Span 都会自动成为消费端 Span 的子 Span。
	
	// 模拟一些工作
	_, childSpan := otel.Tracer("business-logic").Start(ctx, "process-payload")
	defer childSpan.End()
	time.Sleep(100 * time.Millisecond)

	return nil
}

func consumeMessages(client pulsar.Client) {
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "my-traced-topic",
		SubscriptionName: "my-sub",
		Type:             pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// 使用中间件包裹业务逻辑
	wrappedHandler := TraceMiddleware(myBusinessLogic)

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Printf("Failed to receive message: %v", err)
			continue
		}

		// 调用包裹后的处理器
		err = wrappedHandler(context.Background(), msg)
		if err == nil {
			consumer.Ack(msg)
		} else {
			consumer.Nack(msg)
			log.Printf("Failed to process message: %v", err)
		}
	}
}

4. 系统整合与可视化

sequenceDiagram
    participant User
    participant Frontend (UnoCSS)
    participant Istio Gateway
    participant Go Producer Service
    participant Pulsar Topic
    participant Go Consumer Service

    User->>Frontend (UnoCSS): 点击 "提交订单" 按钮
    Frontend (UnoCSS)->>Istio Gateway: 发起 POST /orders 请求 (携带 Trace Headers)
    Note over Istio Gateway: Istio (Envoy) 自动生成/转发
W3C Trace Context Headers Istio Gateway->>Go Producer Service: 转发请求 Go Producer Service->>Go Producer Service: ctx 包含父 Span 信息 Go Producer Service->>+Pulsar Topic: SendWithTrace(ctx, msg) Note over Go Producer Service, Pulsar Topic: 1. 创建 Producer Span
2. 将 ctx 注入 msg.Properties Pulsar Topic-->>-Go Consumer Service: 投递消息 Go Consumer Service->>Go Consumer Service: TraceMiddleware 启动 Note over Go Consumer Service: 1. 从 msg.Properties 提取 ctx
2. 创建 Consumer Span (链接 Producer Span) Go Consumer Service->>Go Consumer Service: 调用 myBusinessLogic(spanCtx) Note over Go Consumer Service: 业务逻辑中的所有操作
都成为 Consumer Span 的子 Span

当这套机制部署后,我们在 Jaeger 中终于看到了完整的链路。一条 Trace 从前端的 API 调用开始,经过生产者服务,清晰地展示了 my-traced-topic send 这个 Span,然后通过一个 Link 关系连接到消费者服务的 my-traced-topic receive Span。两个 Span 的时间差准确地反映了消息在 Pulsar 中的端到端延迟。生产者和消费者的日志,因为都包含了相同的 trace_idspan_id,可以在日志系统中轻松地关联查询。

前端与 UnoCSS 的关联虽然是间接的,但却是整个流程的起点。我们的前端团队在全局的 API 请求拦截器中集成了 OpenTelemetry JS SDK。当用户点击一个带有特定 data-trace-name 属性的、由 UnoCSS 样式化的按钮时,拦截器会以这个属性值为名,创建一个顶层 Span。例如:

<!-- UnoCSS 让我们可以快速构建交互元素 -->
<button 
  class="py-2 px-4 font-semibold rounded-lg shadow-md text-white bg-green-500 hover:bg-green-700"
  data-trace-name="create-order-click"
  onclick="submitOrder()">
  提交订单
</button>

这样,追踪的源头就有了业务语义,而不仅仅是一个 POST /orders

遗留问题与未来迭代

这个方案完美解决了我们的核心痛点,但并非没有权衡。

首先,我们编写的 InstrumentedProducerTraceMiddleware 属于自定义埋点,需要团队内部维护。如果 Pulsar 的 Go 客户端未来提供了官方的 OpenTelemetry Instrumentation,我们应该第一时间迁移过去,以减少维护成本并获得更全面的支持。

其次,服务网格对于 Pulsar 这种非 HTTP 的 TCP 协议,通常只能做到 L4 级别的代理和指标收集,无法解析协议内容来自动完成追踪上下文的传播。我们的方案是在应用层实现的,绕过了服务网格的限制。更理想的未来是,Envoy 拥有 Pulsar 过滤器(Filter),能够像处理 HTTP 和 gRPC 那样,自动在网络层面完成上下文的注入和提取,这样应用代码将完全无需关心追踪逻辑。

最后,当前的实现只覆盖了生产和消费两个基本操作。对于 Pulsar 的一些高级特性,如 Reader 接口、事务性消息等,如果需要纳入追踪,则需要进一步扩展我们的埋点代码。这提醒我们,任何自定义的观测性解决方案都需要随着业务和技术的演进而持续迭代。


  目录