一个请求超时了。前端界面上只显示一个模糊的“任务提交失败”提示。检查Scala服务的API日志,HTTP 202 Accepted,请求处理成功,任务已投递到消息队列。然而,下游处理该任务的Python Celery Worker日志却一片沉寂,没有任何相关的记录。问题出在哪?是消息在RabbitMQ中丢失了,还是Celery Worker消费时发生了未被捕获的异常?或者,是网络分区导致Worker根本没收到消息?
在真实项目中,这种跨语言、跨进程、异步化的架构组合非常普遍,但也带来了巨大的诊断难题。手动在每一层日志中植入一个 correlation_id
是最原始的追溯手段,但它繁琐、易错,且无法提供延迟分析、服务拓扑等深度洞察。我们需要的是一个自动化的、贯穿整个调用链的追踪体系。这就是我们决定引入OpenTelemetry的原因——不是把它当作一个时髦的技术名词,而是解决具体工程痛点的利器。
我们的技术栈:
- 前端: React (使用TypeScript),负责发起任务创建请求。
- API网关/核心服务: Scala with Akka HTTP,负责接收请求,进行业务校验,并将异步任务推送到RabbitMQ。
- 异步任务处理器: Python Celery,订阅RabbitMQ队列,执行耗时的计算任务。
目标是实现一个从用户在浏览器点击按钮开始,到Scala服务接收请求,再到Celery Worker执行完毕的完整链路追踪。
第一步:为Scala服务植入追踪探针
Scala运行在JVM上,可以利用OpenTelemetry的Java Agent实现大部分自动探针。但这在我们的场景中还不够,因为我们需要手动将追踪上下文(Trace Context)注入到发送给RabbitMQ的消息头中。
首先,build.sbt
依赖配置。除了Akka相关库,核心是OpenTelemetry的API和RabbitMQ的Java客户端。
// build.sbt
val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.2.12"
val OpenTelemetryVersion = "1.32.0"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.rabbitmq" % "amqp-client" % "5.17.0",
// OpenTelemetry API for manual instrumentation
"io.opentelemetry" % "opentelemetry-api" % OpenTelemetryVersion,
"io.opentelemetry" % "opentelemetry-context" % OpenTelemetryVersion,
// Logging
"ch.qos.logback" % "logback-classic" % "1.2.11",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"
)
我们的核心挑战是在TaskPublisher
服务中。当它向RabbitMQ发布消息时,必须从当前的Trace Context中提取信息,并将其作为消息的headers
附加进去。W3C Trace Context规范定义了标准的传播头,如traceparent
和tracestate
。我们将遵循这个标准。
// src/main/scala/com/example/TaskPublisher.scala
package com.example
import com.rabbitmq.client.{AMQP, ConnectionFactory, MessageProperties}
import com.typesafe.scalalogging.LazyLogging
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter
import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._
import scala.collection.mutable
class TaskPublisher extends LazyLogging {
private val EXCHANGE_NAME = "tasks_exchange"
private val ROUTING_KEY = "heavy.computation"
private val QUEUE_NAME = "celery"
// 这里的连接和通道管理在生产环境中应该更健壮,例如使用连接池
private val factory = new ConnectionFactory()
factory.setHost("localhost") // Use your RabbitMQ host
private val connection = factory.newConnection()
private val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true)
channel.queueDeclare(QUEUE_NAME, true, false, false, null)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY)
// OTel的关键:一个TextMapSetter的实现,用于将上下文注入到AMQP消息头中
private val setter = new TextMapSetter[mutable.Map[String, Object]] {
override def set(carrier: mutable.Map[String, Object], key: String, value: String): Unit = {
// AMQP header values must be of supported types. String is fine.
carrier.getOrElseUpdate(key, value)
}
}
def publishTask(taskPayload: String): Unit = {
val tracer = GlobalOpenTelemetry.getTracer("scala-task-publisher")
val span = tracer.spanBuilder("publish-to-rabbitmq").startSpan()
// 确保span在当前上下文中是活动的
try span.makeCurrent().use { _ =>
val headers = mutable.Map[String, Object]()
// 这是核心:将当前的OpenTelemetry上下文注入到我们的headers mutable.Map中
GlobalOpenTelemetry.getPropagators.getTextMapPropagator.inject(
Context.current(),
headers,
setter
)
logger.info(s"Injecting trace context into AMQP headers: ${headers.toString}")
val props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("utf-8")
.headers(headers.asJava)
.deliveryMode(2) // persistent message
.build()
channel.basicPublish(
EXCHANGE_NAME,
ROUTING_KEY,
props,
taskPayload.getBytes(StandardCharsets.UTF_8)
)
span.setAttribute("messaging.system", "rabbitmq")
span.setAttribute("messaging.destination", EXCHANGE_NAME)
span.setAttribute("messaging.routing_key", ROUTING_KEY)
span.setAttribute("message.payload_size", taskPayload.length.toLong)
logger.info(s"Task published successfully for payload: $taskPayload")
} catch {
case e: Exception =>
span.recordException(e)
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getMessage)
logger.error("Failed to publish task to RabbitMQ", e)
throw e // Re-throw to let the caller handle it
} finally {
span.end()
}
}
def close(): Unit = {
channel.close()
connection.close()
}
}
在Akka HTTP路由中,我们调用这个TaskPublisher
。当使用Java Agent时,Akka HTTP服务器会自动创建入口Span。我们需要做的就是在路由逻辑中调用我们的发布方法。
// src/main/scala/com/example/WebServer.scala
package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
import com.typesafe.scalalogging.LazyLogging
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
object WebServer extends LazyLogging {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem(Behaviors.empty, "my-system")
implicit val executionContext = system.executionContext
val publisher = new TaskPublisher()
val route =
path("create-task") {
post {
entity(as[String]) { payload =>
// Java Agent会自动创建服务器Span,我们在这里创建一个内部span来表示业务逻辑
val tracer = GlobalOpenTelemetry.getTracer("akka-http-server")
val internalSpan = tracer.spanBuilder("process-and-dispatch-task").startSpan()
try internalSpan.makeCurrent().use { _ =>
logger.info("Received task creation request.")
// 实际项目中这里会有复杂的业务逻辑
internalSpan.setAttribute("task.payload.preview", payload.take(50))
publisher.publishTask(s"""{"data": "$payload"}""")
complete("Task accepted")
} finally {
internalSpan.end()
}
}
}
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
logger.info(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
bindingFuture
.flatMap(_.unbind())
.onComplete { _ =>
publisher.close()
system.terminate()
}
}
}
要运行它,我们需要下载OpenTelemetry Java Agent jar包,并通过JVM参数启动应用:
# 假设opentelemetry-javaagent.jar在当前目录
sbt "runMain com.example.WebServer \
-javaagent:./opentelemetry-javaagent.jar \
-Dotel.service.name=scala-task-service \
-Dotel.traces.exporter=otlp \
-Dotel.exporter.otlp.endpoint=http://localhost:4317 \
-Dotel.instrumentation.akka-http.enabled=true"
此时,任何对/create-task
的请求都会在Scala服务内生成追踪数据,并且包含traceparent
的AMQP消息会被发送到RabbitMQ。链路在这里中断了,下一步是让Celery Worker接上。
第二步:让Celery Worker识别并继续链路
Celery生态有很好的OpenTelemetry集成。我们需要安装opentelemetry-instrumentation-celery
包。关键在于配置,确保它能从AMQP消息头中正确提取上下文。
requirements.txt
文件:
celery==5.3.6
redis==5.0.1 # or your preferred broker client
pika==1.3.2 # RabbitMQ client
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
opentelemetry-instrumentation-celery==0.43b0
现在,我们来配置Celery应用和OpenTelemetry。我们将创建一个tracing.py
文件来集中处理所有追踪相关的初始化。
# tracing.py
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.celery import CeleryInstrumentor
def setup_tracing():
"""Configures OpenTelemetry tracing for the Celery worker."""
# 确保只初始化一次
if getattr(setup_tracing, "initialized", False):
return
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 设置服务名等资源属性
resource = Resource(attributes={
"service.name": "python-celery-worker"
})
# 创建一个TracerProvider
provider = TracerProvider(resource=resource)
# 配置OTLP导出器,指向collector
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
# 使用BatchSpanProcessor进行异步导出
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
# 设置全局的TracerProvider
trace.set_tracer_provider(provider)
logger.info("OpenTelemetry tracing configured.")
# 关键:对Celery进行插桩
# a.s. a celery_app. CeleryInstrumentor().instrument() will be called later
# once celery_app is initialized.
CeleryInstrumentor().instrument()
logger.info("Celery instrumentation applied.")
setup_tracing.initialized = True
接下来是Celery应用的定义。在tasks.py
中,我们创建Celery实例并定义我们的任务。最重要的是,在创建app之后、任务定义之前,调用setup_tracing
。
# tasks.py
import time
import random
import logging
from celery import Celery
from tracing import setup_tracing
logger = logging.getLogger(__name__)
# 使用RabbitMQ作为Broker
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
# 使用Redis作为结果后端 (可选)
RESULT_BACKEND = 'redis://localhost:6379/0'
celery_app = Celery(
'tasks',
broker=BROKER_URL,
backend=RESULT_BACKEND
)
# 在Celery App实例化后立即进行追踪设置
# 这将确保Celery的信号和任务基类被正确地打上补丁
setup_tracing()
@celery_app.task(name='heavy.computation')
def heavy_computation(payload: dict):
"""
一个模拟耗时计算的任务
"""
logger.info(f"Received task with payload: {payload}")
# 这里的代码会自动在一个新的span下执行,
# 该span是Scala服务中publish span的子span
try:
# 模拟一些工作
base_duration = 2
random_jitter = random.uniform(0.5, 2.0)
total_duration = base_duration + random_jitter
logger.info(f"Starting heavy computation, will take approx {total_duration:.2f}s")
time.sleep(total_duration)
if random.random() < 0.1: # 10%的概率失败
raise ValueError("A random error occurred during computation!")
result = {"status": "success", "processed_data": f"processed_{payload.get('data')}"}
logger.info("Heavy computation finished successfully.")
return result
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
# 异常会自动被opentelemetry捕获并记录在span中
raise
启动Celery Worker:
celery -A tasks worker --loglevel=info
CeleryInstrumentor
的魔法在于它会监听Celery的before_task_publish
和after_task_publish
信号,以及重写任务的执行逻辑。当一个任务被消费时,它会检查消息的headers
,查找traceparent
头,并用它来创建一个新的Span,这个Span的Parent就是我们在Scala服务中创建的那个Span。这样,链路就无缝地连接起来了。
第三步:从前端发起,完成闭环
最后一步是让前端请求也带上追踪信息。我们使用@opentelemetry/sdk-trace-web
和相关包。
首先,安装依赖:
npm install @opentelemetry/api @opentelemetry/sdk-trace-web @opentelemetry/context-zone \
@opentelemetry/instrumentation-fetch @opentelemetry/exporter-trace-otlp-http \
@opentelemetry/resources @opentelemetry/semantic-conventions
然后,在应用入口处(例如index.tsx
或一个专门的tracing.ts
文件)配置OpenTelemetry。
// src/tracing.ts
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'react-frontend-app',
});
const provider = new WebTracerProvider({
resource: resource,
});
// 使用OTLP HTTP导出器,注意端口是4318 (HTTP) 而不是4317 (gRPC)
const exporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces',
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
// ZoneContextManager是必须的,用于在异步操作中正确传播上下文
provider.register({
contextManager: new ZoneContextManager(),
});
// 自动对fetch等Web API进行插桩
getWebAutoInstrumentations({
// 配置要启用的插桩
'@opentelemetry/instrumentation-fetch': {
propagateTraceHeaderCorsUrls: [
/http:\/\/localhost:8080\/.*/, // 允许向我们的Scala后端传播追踪头
],
clearTimingResources: true,
},
});
console.log("OpenTelemetry Web Tracer initialized.");
在React应用的入口文件,如index.tsx
,导入并执行这个初始化脚本。
// src/index.tsx
import './tracing'; // 确保在应用渲染前初始化
import React from 'react';
import ReactDOM from 'react-dom/client';
import App from './App';
const root = ReactDOM.createRoot(
document.getElementById('root') as HTMLElement
);
root.render(
<React.StrictMode>
<App />
</React.StrictMode>
);
现在,在我们的React组件中,任何使用fetch
API的调用都会被自动拦截。instrumentation-fetch
会创建一个新的Span,并将traceparent
头注入到发往http://localhost:8080
的HTTP请求中。
// src/App.tsx
import React, { useState } from 'react';
function App() {
const [status, setStatus] = useState<string>('Ready to submit task.');
const handleSubmitTask = async () => {
setStatus('Submitting task...');
try {
const response = await fetch('http://localhost:8080/create-task', {
method: 'POST',
headers: {
'Content-Type': 'text/plain',
},
body: `task_payload_${Date.now()}`,
});
if (!response.ok) {
throw new Error(`Server responded with ${response.status}`);
}
const text = await response.text();
setStatus(`Task submission successful: ${text}`);
} catch (error) {
console.error('Failed to submit task', error);
// 这个错误也会被OpenTelemetry自动捕获并附加到fetch span上
setStatus(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
};
return (
<div>
<h1>Task Submitter</h1>
<button onClick={handleSubmitTask}>Create a Heavy Task</button>
<p>Status: {status}</p>
</div>
);
}
export default App;
验证完整的链路
现在,我们把所有部分串联起来:
- 启动本地的 OpenTelemetry Collector、Jaeger (用于可视化)、RabbitMQ 和 Redis。
- 运行Scala应用:
sbt "runMain ..."
- 运行Celery Worker:
celery -A tasks worker ...
- 运行React前端应用:
npm start
在浏览器中打开前端页面,点击“Create a Heavy Task”按钮。然后打开Jaeger UI,我们应该能看到一条完整的链路,它清晰地展示了整个过程:
sequenceDiagram participant Browser as React Frontend participant Scala as Scala Service participant RabbitMQ participant Python as Celery Worker Browser->>+Scala: POST /create-task (with traceparent header) Note over Scala: OTel Agent creates a server span. Scala->>Scala: process-and-dispatch-task (child span) Scala->>+RabbitMQ: basic.publish (with traceparent in headers) Note over Scala: publish-to-rabbitmq (child span) Scala-->>-Browser: HTTP 202 Accepted RabbitMQ-->>-Python: Delivers message Note over Python: OTel Celery instrumentor extracts context. Python->>Python: heavy.computation (new span, child of publish span) Note over Python: Executes the task...
这个追踪图谱不仅仅是日志的替代品。它为我们提供了每个阶段的精确耗时,清晰地显示了服务间的依赖关系。如果Celery任务失败,异常信息会直接附加在对应的Span上。如果消息在队列中延迟过高,我们也能从publish
Span结束到heavy.computation
Span开始之间的时间差看出来。
方案的局限性与未来迭代
这套体系解决了跨异构系统追踪的核心问题,但它并非银弹。在生产环境中,仍有几个方面需要考虑:
采样策略: 目前我们可能记录了100%的追踪。在高流量系统中,这会带来巨大的存储和性能开销。需要配置合理的采样策略,例如基于速率的采样或更复杂的尾部采样(tail-based sampling),以确保在捕获关键信息和控制成本之间取得平衡。
上下文传播的健壮性: 虽然W3C Trace Context是标准,但并非所有中间件或库都原生支持。在更复杂的系统中,比如跨越多个消息队列、RPC框架或数据库时,可能需要为某些组件编写自定义的传播器(propagator)。
日志与追踪的关联: 单有追踪还不够,将Trace ID自动注入到所有结构化日志中至关重要。这样,当在Jaeger中发现一个有问题的Span时,可以立即用其Trace ID去日志系统中(如ELK Stack或Loki)检索该请求链路上的所有详细日志,实现从宏观到微观的快速下钻。
业务指标集成: 除了技术层面的追踪,将关键业务事件(如“用户下单”、“报告生成成功”)作为Span Events附加到追踪链路上,可以为业务分析和故障排查提供更丰富的上下文。