构建跨越前端、Scala与Celery的分布式全链路追踪体系


一个请求超时了。前端界面上只显示一个模糊的“任务提交失败”提示。检查Scala服务的API日志,HTTP 202 Accepted,请求处理成功,任务已投递到消息队列。然而,下游处理该任务的Python Celery Worker日志却一片沉寂,没有任何相关的记录。问题出在哪?是消息在RabbitMQ中丢失了,还是Celery Worker消费时发生了未被捕获的异常?或者,是网络分区导致Worker根本没收到消息?

在真实项目中,这种跨语言、跨进程、异步化的架构组合非常普遍,但也带来了巨大的诊断难题。手动在每一层日志中植入一个 correlation_id 是最原始的追溯手段,但它繁琐、易错,且无法提供延迟分析、服务拓扑等深度洞察。我们需要的是一个自动化的、贯穿整个调用链的追踪体系。这就是我们决定引入OpenTelemetry的原因——不是把它当作一个时髦的技术名词,而是解决具体工程痛点的利器。

我们的技术栈:

  • 前端: React (使用TypeScript),负责发起任务创建请求。
  • API网关/核心服务: Scala with Akka HTTP,负责接收请求,进行业务校验,并将异步任务推送到RabbitMQ。
  • 异步任务处理器: Python Celery,订阅RabbitMQ队列,执行耗时的计算任务。

目标是实现一个从用户在浏览器点击按钮开始,到Scala服务接收请求,再到Celery Worker执行完毕的完整链路追踪。

第一步:为Scala服务植入追踪探针

Scala运行在JVM上,可以利用OpenTelemetry的Java Agent实现大部分自动探针。但这在我们的场景中还不够,因为我们需要手动将追踪上下文(Trace Context)注入到发送给RabbitMQ的消息头中。

首先,build.sbt 依赖配置。除了Akka相关库,核心是OpenTelemetry的API和RabbitMQ的Java客户端。

// build.sbt

val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.2.12"
val OpenTelemetryVersion = "1.32.0"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.rabbitmq" % "amqp-client" % "5.17.0",
  
  // OpenTelemetry API for manual instrumentation
  "io.opentelemetry" % "opentelemetry-api" % OpenTelemetryVersion,
  "io.opentelemetry" % "opentelemetry-context" % OpenTelemetryVersion,

  // Logging
  "ch.qos.logback" % "logback-classic" % "1.2.11",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"
)

我们的核心挑战是在TaskPublisher服务中。当它向RabbitMQ发布消息时,必须从当前的Trace Context中提取信息,并将其作为消息的headers附加进去。W3C Trace Context规范定义了标准的传播头,如traceparenttracestate。我们将遵循这个标准。

// src/main/scala/com/example/TaskPublisher.scala

package com.example

import com.rabbitmq.client.{AMQP, ConnectionFactory, MessageProperties}
import com.typesafe.scalalogging.LazyLogging
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter

import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._
import scala.collection.mutable

class TaskPublisher extends LazyLogging {

  private val EXCHANGE_NAME = "tasks_exchange"
  private val ROUTING_KEY = "heavy.computation"
  private val QUEUE_NAME = "celery"

  // 这里的连接和通道管理在生产环境中应该更健壮,例如使用连接池
  private val factory = new ConnectionFactory()
  factory.setHost("localhost") // Use your RabbitMQ host
  private val connection = factory.newConnection()
  private val channel = connection.createChannel()

  channel.exchangeDeclare(EXCHANGE_NAME, "direct", true)
  channel.queueDeclare(QUEUE_NAME, true, false, false, null)
  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY)

  // OTel的关键:一个TextMapSetter的实现,用于将上下文注入到AMQP消息头中
  private val setter = new TextMapSetter[mutable.Map[String, Object]] {
    override def set(carrier: mutable.Map[String, Object], key: String, value: String): Unit = {
      // AMQP header values must be of supported types. String is fine.
      carrier.getOrElseUpdate(key, value)
    }
  }

  def publishTask(taskPayload: String): Unit = {
    val tracer = GlobalOpenTelemetry.getTracer("scala-task-publisher")
    val span = tracer.spanBuilder("publish-to-rabbitmq").startSpan()
    
    // 确保span在当前上下文中是活动的
    try span.makeCurrent().use { _ =>
      val headers = mutable.Map[String, Object]()
      
      // 这是核心:将当前的OpenTelemetry上下文注入到我们的headers mutable.Map中
      GlobalOpenTelemetry.getPropagators.getTextMapPropagator.inject(
        Context.current(),
        headers,
        setter
      )

      logger.info(s"Injecting trace context into AMQP headers: ${headers.toString}")

      val props = new AMQP.BasicProperties.Builder()
        .contentType("application/json")
        .contentEncoding("utf-8")
        .headers(headers.asJava)
        .deliveryMode(2) // persistent message
        .build()

      channel.basicPublish(
        EXCHANGE_NAME,
        ROUTING_KEY,
        props,
        taskPayload.getBytes(StandardCharsets.UTF_8)
      )
      
      span.setAttribute("messaging.system", "rabbitmq")
      span.setAttribute("messaging.destination", EXCHANGE_NAME)
      span.setAttribute("messaging.routing_key", ROUTING_KEY)
      span.setAttribute("message.payload_size", taskPayload.length.toLong)
      
      logger.info(s"Task published successfully for payload: $taskPayload")
    } catch {
      case e: Exception =>
        span.recordException(e)
        span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getMessage)
        logger.error("Failed to publish task to RabbitMQ", e)
        throw e // Re-throw to let the caller handle it
    } finally {
      span.end()
    }
  }

  def close(): Unit = {
    channel.close()
    connection.close()
  }
}

在Akka HTTP路由中,我们调用这个TaskPublisher。当使用Java Agent时,Akka HTTP服务器会自动创建入口Span。我们需要做的就是在路由逻辑中调用我们的发布方法。

// src/main/scala/com/example/WebServer.scala

package com.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
import com.typesafe.scalalogging.LazyLogging
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span

object WebServer extends LazyLogging {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem(Behaviors.empty, "my-system")
    implicit val executionContext = system.executionContext

    val publisher = new TaskPublisher()

    val route =
      path("create-task") {
        post {
          entity(as[String]) { payload =>
            // Java Agent会自动创建服务器Span,我们在这里创建一个内部span来表示业务逻辑
            val tracer = GlobalOpenTelemetry.getTracer("akka-http-server")
            val internalSpan = tracer.spanBuilder("process-and-dispatch-task").startSpan()
            try internalSpan.makeCurrent().use { _ =>
              logger.info("Received task creation request.")
              // 实际项目中这里会有复杂的业务逻辑
              internalSpan.setAttribute("task.payload.preview", payload.take(50))
              publisher.publishTask(s"""{"data": "$payload"}""")
              complete("Task accepted")
            } finally {
              internalSpan.end()
            }
          }
        }
      }

    val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)

    logger.info(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine()
    bindingFuture
      .flatMap(_.unbind())
      .onComplete { _ =>
        publisher.close()
        system.terminate()
      }
  }
}

要运行它,我们需要下载OpenTelemetry Java Agent jar包,并通过JVM参数启动应用:

# 假设opentelemetry-javaagent.jar在当前目录
sbt "runMain com.example.WebServer \
  -javaagent:./opentelemetry-javaagent.jar \
  -Dotel.service.name=scala-task-service \
  -Dotel.traces.exporter=otlp \
  -Dotel.exporter.otlp.endpoint=http://localhost:4317 \
  -Dotel.instrumentation.akka-http.enabled=true"

此时,任何对/create-task的请求都会在Scala服务内生成追踪数据,并且包含traceparent的AMQP消息会被发送到RabbitMQ。链路在这里中断了,下一步是让Celery Worker接上。

第二步:让Celery Worker识别并继续链路

Celery生态有很好的OpenTelemetry集成。我们需要安装opentelemetry-instrumentation-celery包。关键在于配置,确保它能从AMQP消息头中正确提取上下文。

requirements.txt 文件:

celery==5.3.6
redis==5.0.1 # or your preferred broker client
pika==1.3.2 # RabbitMQ client
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
opentelemetry-instrumentation-celery==0.43b0

现在,我们来配置Celery应用和OpenTelemetry。我们将创建一个tracing.py文件来集中处理所有追踪相关的初始化。

# tracing.py

import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.celery import CeleryInstrumentor

def setup_tracing():
    """Configures OpenTelemetry tracing for the Celery worker."""
    
    # 确保只初始化一次
    if getattr(setup_tracing, "initialized", False):
        return

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # 设置服务名等资源属性
    resource = Resource(attributes={
        "service.name": "python-celery-worker"
    })

    # 创建一个TracerProvider
    provider = TracerProvider(resource=resource)
    
    # 配置OTLP导出器,指向collector
    otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
    
    # 使用BatchSpanProcessor进行异步导出
    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)

    # 设置全局的TracerProvider
    trace.set_tracer_provider(provider)
    
    logger.info("OpenTelemetry tracing configured.")

    # 关键:对Celery进行插桩
    # a.s. a celery_app. CeleryInstrumentor().instrument() will be called later
    # once celery_app is initialized.
    CeleryInstrumentor().instrument()
    logger.info("Celery instrumentation applied.")
    
    setup_tracing.initialized = True

接下来是Celery应用的定义。在tasks.py中,我们创建Celery实例并定义我们的任务。最重要的是,在创建app之后、任务定义之前,调用setup_tracing

# tasks.py

import time
import random
import logging
from celery import Celery
from tracing import setup_tracing

logger = logging.getLogger(__name__)

# 使用RabbitMQ作为Broker
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
# 使用Redis作为结果后端 (可选)
RESULT_BACKEND = 'redis://localhost:6379/0'

celery_app = Celery(
    'tasks',
    broker=BROKER_URL,
    backend=RESULT_BACKEND
)

# 在Celery App实例化后立即进行追踪设置
# 这将确保Celery的信号和任务基类被正确地打上补丁
setup_tracing()


@celery_app.task(name='heavy.computation')
def heavy_computation(payload: dict):
    """
    一个模拟耗时计算的任务
    """
    logger.info(f"Received task with payload: {payload}")
    
    # 这里的代码会自动在一个新的span下执行,
    # 该span是Scala服务中publish span的子span
    try:
        # 模拟一些工作
        base_duration = 2
        random_jitter = random.uniform(0.5, 2.0)
        total_duration = base_duration + random_jitter
        
        logger.info(f"Starting heavy computation, will take approx {total_duration:.2f}s")
        time.sleep(total_duration)

        if random.random() < 0.1: # 10%的概率失败
            raise ValueError("A random error occurred during computation!")

        result = {"status": "success", "processed_data": f"processed_{payload.get('data')}"}
        logger.info("Heavy computation finished successfully.")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}", exc_info=True)
        # 异常会自动被opentelemetry捕获并记录在span中
        raise

启动Celery Worker:

celery -A tasks worker --loglevel=info

CeleryInstrumentor的魔法在于它会监听Celery的before_task_publishafter_task_publish信号,以及重写任务的执行逻辑。当一个任务被消费时,它会检查消息的headers,查找traceparent头,并用它来创建一个新的Span,这个Span的Parent就是我们在Scala服务中创建的那个Span。这样,链路就无缝地连接起来了。

第三步:从前端发起,完成闭环

最后一步是让前端请求也带上追踪信息。我们使用@opentelemetry/sdk-trace-web和相关包。

首先,安装依赖:

npm install @opentelemetry/api @opentelemetry/sdk-trace-web @opentelemetry/context-zone \
@opentelemetry/instrumentation-fetch @opentelemetry/exporter-trace-otlp-http \
@opentelemetry/resources @opentelemetry/semantic-conventions

然后,在应用入口处(例如index.tsx或一个专门的tracing.ts文件)配置OpenTelemetry。

// src/tracing.ts

import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

const resource = new Resource({
  [SemanticResourceAttributes.SERVICE_NAME]: 'react-frontend-app',
});

const provider = new WebTracerProvider({
  resource: resource,
});

// 使用OTLP HTTP导出器,注意端口是4318 (HTTP) 而不是4317 (gRPC)
const exporter = new OTLPTraceExporter({
  url: 'http://localhost:4318/v1/traces', 
});

provider.addSpanProcessor(new BatchSpanProcessor(exporter));

// ZoneContextManager是必须的,用于在异步操作中正确传播上下文
provider.register({
  contextManager: new ZoneContextManager(),
});

// 自动对fetch等Web API进行插桩
getWebAutoInstrumentations({
  // 配置要启用的插桩
  '@opentelemetry/instrumentation-fetch': {
    propagateTraceHeaderCorsUrls: [
      /http:\/\/localhost:8080\/.*/, // 允许向我们的Scala后端传播追踪头
    ],
    clearTimingResources: true,
  },
});

console.log("OpenTelemetry Web Tracer initialized.");

在React应用的入口文件,如index.tsx,导入并执行这个初始化脚本。

// src/index.tsx
import './tracing'; // 确保在应用渲染前初始化
import React from 'react';
import ReactDOM from 'react-dom/client';
import App from './App';

const root = ReactDOM.createRoot(
  document.getElementById('root') as HTMLElement
);
root.render(
  <React.StrictMode>
    <App />
  </React.StrictMode>
);

现在,在我们的React组件中,任何使用fetch API的调用都会被自动拦截。instrumentation-fetch会创建一个新的Span,并将traceparent头注入到发往http://localhost:8080的HTTP请求中。

// src/App.tsx
import React, { useState } from 'react';

function App() {
  const [status, setStatus] = useState<string>('Ready to submit task.');

  const handleSubmitTask = async () => {
    setStatus('Submitting task...');
    try {
      const response = await fetch('http://localhost:8080/create-task', {
        method: 'POST',
        headers: {
          'Content-Type': 'text/plain',
        },
        body: `task_payload_${Date.now()}`,
      });

      if (!response.ok) {
        throw new Error(`Server responded with ${response.status}`);
      }
      
      const text = await response.text();
      setStatus(`Task submission successful: ${text}`);

    } catch (error) {
      console.error('Failed to submit task', error);
      // 这个错误也会被OpenTelemetry自动捕获并附加到fetch span上
      setStatus(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
    }
  };

  return (
    <div>
      <h1>Task Submitter</h1>
      <button onClick={handleSubmitTask}>Create a Heavy Task</button>
      <p>Status: {status}</p>
    </div>
  );
}

export default App;

验证完整的链路

现在,我们把所有部分串联起来:

  1. 启动本地的 OpenTelemetry Collector、Jaeger (用于可视化)、RabbitMQ 和 Redis。
  2. 运行Scala应用:sbt "runMain ..."
  3. 运行Celery Worker:celery -A tasks worker ...
  4. 运行React前端应用:npm start

在浏览器中打开前端页面,点击“Create a Heavy Task”按钮。然后打开Jaeger UI,我们应该能看到一条完整的链路,它清晰地展示了整个过程:

sequenceDiagram
    participant Browser as React Frontend
    participant Scala as Scala Service
    participant RabbitMQ
    participant Python as Celery Worker

    Browser->>+Scala: POST /create-task (with traceparent header)
    Note over Scala: OTel Agent creates a server span.
    Scala->>Scala: process-and-dispatch-task (child span)
    Scala->>+RabbitMQ: basic.publish (with traceparent in headers)
    Note over Scala: publish-to-rabbitmq (child span)
    Scala-->>-Browser: HTTP 202 Accepted
    RabbitMQ-->>-Python: Delivers message
    Note over Python: OTel Celery instrumentor extracts context.
    Python->>Python: heavy.computation (new span, child of publish span)
    Note over Python: Executes the task...

这个追踪图谱不仅仅是日志的替代品。它为我们提供了每个阶段的精确耗时,清晰地显示了服务间的依赖关系。如果Celery任务失败,异常信息会直接附加在对应的Span上。如果消息在队列中延迟过高,我们也能从publish Span结束到heavy.computation Span开始之间的时间差看出来。

方案的局限性与未来迭代

这套体系解决了跨异构系统追踪的核心问题,但它并非银弹。在生产环境中,仍有几个方面需要考虑:

  1. 采样策略: 目前我们可能记录了100%的追踪。在高流量系统中,这会带来巨大的存储和性能开销。需要配置合理的采样策略,例如基于速率的采样或更复杂的尾部采样(tail-based sampling),以确保在捕获关键信息和控制成本之间取得平衡。

  2. 上下文传播的健壮性: 虽然W3C Trace Context是标准,但并非所有中间件或库都原生支持。在更复杂的系统中,比如跨越多个消息队列、RPC框架或数据库时,可能需要为某些组件编写自定义的传播器(propagator)。

  3. 日志与追踪的关联: 单有追踪还不够,将Trace ID自动注入到所有结构化日志中至关重要。这样,当在Jaeger中发现一个有问题的Span时,可以立即用其Trace ID去日志系统中(如ELK Stack或Loki)检索该请求链路上的所有详细日志,实现从宏观到微观的快速下钻。

  4. 业务指标集成: 除了技术层面的追踪,将关键业务事件(如“用户下单”、“报告生成成功”)作为Span Events附加到追踪链路上,可以为业务分析和故障排查提供更丰富的上下文。


  目录