我们的起点是一个棘手的工程问题:当可观测性数据——涵盖日志、追踪和指标——日增量达到TB级别,总存储迈向PB规模时,应该采用何种后端架构?单一的技术栈,无论是纯OLAP引擎还是HTAP数据库,都开始暴露出明显的短板。这不仅仅是存储问题,更是查询性能、数据关联和运营成本之间的艰难权衡。
定义问题:可观测性后端的三大挑战
一个现代化的可观测性平台必须应对三个核心挑战:
- 混合查询负载: 系统需要同时支持两种截然不同的查询模式。一种是基于TraceID或UserID的快速点查与小范围扫描,这类似于OLTP负载,要求低延迟响应。另一种则是对海量日志数据进行聚合、筛选和分析,这是典型的OLAP负载,要求高吞吐。
- 数据模型的统一与关联: 日志、追踪和指标是三种异构的数据。一个强大的平台必须能够将它们无缝关联。例如,从一个异常的指标图表下钻到相关的Trace,再从Trace定位到具体的错误日志。这就要求底层存储不仅能存储数据,还要能高效地处理跨模型的JOIN操作。
- 成本与性能的平衡: PB级别的数据存储和计算成本是无法忽视的。架构方案必须在保证查询性能的前提下,具备极高的压缩率和计算效率,并且能够对冷热数据进行有效分层,以控制长期存储成本。
方案A:纯ClickHouse架构——速度的诱惑与束缚
ClickHouse作为业界顶尖的OLAP引擎,其在日志和指标分析场景下的性能毋庸置疑。初步构想中,一个纯ClickHouse集群似乎是理所当然的选择。
优势分析:
- 极致的OLAP性能: 基于列式存储和向量化执行引擎,ClickHouse在处理大规模聚合查询时速度极快。对于日志的全文搜索、指标的聚合计算等场景,它几乎是无敌的。
- 惊人的数据压缩比: 列存的特性加上高效的压缩算法,使得ClickHouse的存储成本相对较低,对于海量日志数据非常友好。
劣势分析:
- JOIN操作的局限性: ClickHouse的JOIN能力相对较弱,尤其是对于大规模、高基数的分布式JOIN,性能和稳定性都存在挑战。将日志、追踪和元数据(例如服务信息、部署版本)进行关联查询,会变得非常笨拙和低效。
- 点查和更新的短板: ClickHouse不擅长高并发的点查和数据更新。在我们的场景中,管理告警规则、用户配置、服务元数据等需要频繁更新和查询的信息,用ClickHouse来做会非常痛苦。
- 事务能力的缺失: 平台自身的管理功能(如租户管理、权限控制)需要事务保证,这是ClickHouse无法提供的。
在真实项目中,强行用ClickHouse处理所有负载,最终会导致系统架构的扭曲。我们不得不引入外部的MySQL或PostgreSQL来管理元数据,从而在应用层逻辑中手动完成数据拼接,这违背了构建统一平台的初衷。
方案B:纯TiDB架构——HTAP的承诺与现实
TiDB作为一款优秀的HTAP(混合事务/分析处理)数据库,理论上可以同时处理OLTP和OLAP负载,这使其成为另一个有力的候选者。
优势分析:
- 统一的数据存储: TiDB可以同时存储结构化的元数据(OLTP)和海量的可观测性数据(OLAP),提供标准的SQL接口,简化了开发和运维。
- 强大的事务与一致性: 基于Raft协议,TiDB提供金融级的ACID事务保证,非常适合处理平台的配置管理和元数据。
- 水平扩展能力: TiDB的存储层(TiKV)和计算层(TiDB Server)都可以独立扩展,能够应对高并发的写入和查询请求。其列存引擎TiFlash为OLAP查询加速。
劣势分析:
- 极致OLAP性能与成本: 尽管TiFlash提供了不错的OLAP能力,但在面对PB级日志数据的极限聚合分析场景时,其性能和成本效益相较于ClickHouse这样的专用OLAP引擎,仍然存在差距。ClickHouse的物化视图、专用函数库等特性在可观测性领域更为贴合。
- 写入放大问题: 对于日志这种几乎只有追加写入(Append-Only)的场景,TiDB基于LSM-Tree的存储引擎(TiKV)在后台需要进行频繁的Compaction操作,这在高写入吞吐下可能会带来一定的性能开销和资源消耗。
一个常见的错误是,认为HTAP数据库是解决所有问题的银弹。在我们的规模下,虽然TiDB可以“完成”工作,但可能不是“最优”的选择,尤其是在日志分析的成本和性能上。
最终选择:TiDB + ClickHouse混合架构
权衡之后,我们决定采用一种混合架构,让两款数据库各司其职,发挥其最大优势。
graph TD subgraph 数据源 A[服务A] --> C B[服务B] --> C end subgraph 采集层 C[OTel Collector] --> D[Kafka] end subgraph 处理与分发层 D --> E{Go Ingestion Service} end subgraph 存储层 E -- 元数据/热数据/追踪 --> F[TiDB Cluster] E -- 日志/指标/冷追踪 --> G[ClickHouse Cluster] end subgraph 查询层 H[Query Federation Service] --> F H --> G end subgraph 应用层 I[API / Dashboard] --> H end
架构核心思想:
TiDB作为“控制平面”与“热数据中心”:
- 存储所有元数据:服务信息、告警规则、用户配置、租户信息等。
- 存储近期的、关联查询频繁的追踪数据(例如过去1小时的Traces)。Trace数据结构复杂,关联查询多,TiDB的事务和JOIN能力是完美匹配。
- 提供强一致性的OLTP服务。
ClickHouse作为“数据平面”与“冷/温数据湖”:
- 存储全量的、海量的结构化日志和指标数据。
- 存储归档的追踪数据(例如超过1小时的Traces)。
- 为大规模数据分析和聚合提供极致性能。
Query Federation Service(查询联邦服务): 这是整个架构的粘合剂。它是一个自研的后端服务,负责解析来自前端的查询请求,将其拆解成针对TiDB和ClickHouse的子查询,最后合并结果返回。这是整个方案的技术关键点。
核心实现概览
1. 统一的数据“样式方案”:结构化与模型定义
为了让异构数据能够在两个数据库中流转并被统一查询,我们定义了一套严格的数据模型。我们称之为“样式方案”,它本质上是一套跨系统的Schema规范。
以下是在Go中定义的核心日志结构,它将被用于写入Kafka,并最终映射到TiDB和ClickHouse的表中。
// file: internal/models/log.go
package models
import (
"time"
)
// UnifiedLog represents the standardized structure for all incoming logs.
// This struct serves as the "styling scheme" for our observability data.
// It's designed to be easily mapped to both ClickHouse and TiDB tables.
type UnifiedLog struct {
Timestamp time.Time `json:"timestamp"` // 精确到纳秒的时间戳
TraceID string `json:"trace_id"` // 关联追踪的TraceID
SpanID string `json:"span_id"` // 关联追踪的SpanID
Severity string `json:"severity"` // 日志级别 (e.g., INFO, ERROR)
ServiceName string `json:"service_name"` // 服务名称
HostName string `json:"host_name"` // 主机名
Body string `json:"body"` // 日志主体消息
Attributes map[string]string `json:"attributes"` // 结构化属性 (Key-Value)
Resource map[string]string `json:"resource"` // 资源属性 (e.g., k8s_pod_name)
}
// ClickHouse table schema for this struct
/*
CREATE TABLE observer.logs (
`Timestamp` DateTime64(9, 'Asia/Shanghai'),
`TraceID` String,
`SpanID` String,
`Severity` LowCardinality(String),
`ServiceName` LowCardinality(String),
`HostName` LowCardinality(String),
`Body` String,
`Attributes` Map(String, String),
`Resource` Map(String, String),
INDEX idx_trace_id TraceID TYPE bloom_filter,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(Timestamp)
ORDER BY (ServiceName, Severity, Timestamp);
*/
// TiDB would typically store metadata or recent traces, not high-volume logs.
// For example, a trace metadata table might look like this:
/*
CREATE TABLE observer.trace_metadata (
`trace_id` VARCHAR(36) PRIMARY KEY,
`service_name` VARCHAR(255),
`operation_name` VARCHAR(255),
`start_time` DATETIME(6),
`duration_ms` INT,
`has_error` BOOLEAN,
INDEX idx_service_time (service_name, start_time)
);
*/
这里的坑在于,LowCardinality
类型是ClickHouse优化的关键,对于服务名、日志级别这类基数不高的字段,能极大提升性能和压缩率。设计表结构时必须充分考虑目标数据库的特性。
2. 数据写入与分发服务
这个Go服务是数据流的咽喉。它消费Kafka中的原始数据,根据数据类型和时间戳进行路由决策。
// file: cmd/ingester/main.go
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/segmentio/kafka-go"
"your_project/internal/db"
"your_project/internal/models"
)
const (
kafkaTopic = "otel-logs"
kafkaBroker = "kafka:9092"
consumerGroupID = "log-ingester-group"
clickhouseBatchSize = 10000
clickhouseFlushInterval = 5 * time.Second
)
func main() {
// 生产级的服务需要更健壮的配置管理
chConn, err := db.NewClickHouseClient("tcp://clickhouse-server:9000?database=observer")
if err != nil {
log.Fatalf("Failed to connect to ClickHouse: %v", err)
}
defer chConn.Close()
// TiDB client initialization would be here...
// tidbConn, err := db.NewTiDBClient("user:pass@tcp(tidb-server:4000)/observer")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
Topic: kafkaTopic,
GroupID: consumerGroupID,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Buffer for batching writes to ClickHouse
logBuffer := make(chan *models.UnifiedLog, clickhouseBatchSize*2)
wg.Add(1)
go func() {
defer wg.Done()
processKafkaMessages(ctx, reader, logBuffer)
}()
wg.Add(1)
go func() {
defer wg.Done()
batchInsertToClickHouse(ctx, chConn, logBuffer)
}()
// Graceful shutdown
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
log.Println("Shutdown signal received, draining buffers...")
cancel() // Notify goroutines to stop
wg.Wait()
log.Println("Ingester shut down gracefully.")
}
func processKafkaMessages(ctx context.Context, reader *kafka.Reader, logBuffer chan<- *models.UnifiedLog) {
for {
select {
case <-ctx.Done():
log.Println("Kafka processor shutting down.")
return
default:
// Use FetchMessage for better control over context cancellation
m, err := reader.FetchMessage(ctx)
if err != nil {
if err == context.Canceled {
return
}
log.Printf("Error fetching message: %v", err)
continue
}
var logEntry models.UnifiedLog
if err := json.Unmarshal(m.Value, &logEntry); err != nil {
log.Printf("Failed to unmarshal log: %v", err)
// In production, send to a dead-letter queue
reader.CommitMessages(ctx, m) // Commit even on error to avoid reprocessing
continue
}
// 路由逻辑:这里简化为所有日志都进ClickHouse
// 真实的场景会更复杂,比如根据logEntry.Type判断是trace还是log
// if isTrace(logEntry) && time.Since(logEntry.Timestamp) < time.Hour {
// // Write to TiDB for recent traces
// } else {
// logBuffer <- &logEntry
// }
logBuffer <- &logEntry
if err := reader.CommitMessages(ctx, m); err != nil {
log.Printf("Failed to commit message: %v", err)
}
}
}
}
func batchInsertToClickHouse(ctx context.Context, conn db.ClickHouseClient, logBuffer <-chan *models.UnifiedLog) {
ticker := time.NewTicker(clickhouseFlushInterval)
defer ticker.Stop()
batch := make([]*models.UnifiedLog, 0, clickhouseBatchSize)
for {
select {
case <-ctx.Done():
log.Println("ClickHouse writer shutting down. Flushing final batch...")
if len(batch) > 0 {
flushBatch(context.Background(), conn, batch) // Use a background context for final flush
}
return
case logEntry := <-logBuffer:
batch = append(batch, logEntry)
if len(batch) >= clickhouseBatchSize {
flushBatch(ctx, conn, batch)
batch = make([]*models.UnifiedLog, 0, clickhouseBatchSize) // Reset batch
}
case <-ticker.C:
if len(batch) > 0 {
flushBatch(ctx, conn, batch)
batch = make([]*models.UnifiedLog, 0, clickhouseBatchSize) // Reset batch
}
}
}
}
func flushBatch(ctx context.Context, conn db.ClickHouseClient, batch []*models.UnifiedLog) {
// 实际的db.ClickHouseClient会封装底层的 batch.Append 和 conn.Send
// 这里仅为示意
err := conn.BatchInsertLogs(ctx, batch)
if err != nil {
log.Printf("Failed to insert batch of %d logs to ClickHouse: %v", len(batch), err)
// Production code needs a robust retry mechanism with backoff,
// and potentially writing failed batches to a recovery location.
} else {
log.Printf("Successfully inserted batch of %d logs.", len(batch))
}
}
这段代码展示了一个生产级消费者的基本骨架:优雅停机、上下文管理、批量写入和定时刷新。一个常见的错误是在循环中逐条写入数据库,这会对数据库造成巨大压力。批量提交是必须的。
3. 查询联邦服务
这是架构的大脑。它需要解析查询,决定数据源,然后合并结果。一个简化的示例如下,假设我们要查询一个特定TraceID的所有相关日志。
// file: cmd/query-federator/handler.go
package main
import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
"your_project/internal/db"
"your_project/internal/models"
)
type QueryService struct {
tidb *db.TiDBClient
ch *db.ClickHouseClient
}
// A query for a trace might involve fetching metadata from TiDB and logs from ClickHouse
func (s *QueryService) GetTraceDetails(w http.ResponseWriter, r *http.Request) {
traceID := r.URL.Query().Get("traceID")
if traceID == "" {
http.Error(w, "traceID is required", http.StatusBadRequest)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
var wg sync.WaitGroup
var traceMeta *models.TraceMetadata
var traceLogs []*models.UnifiedLog
var metaErr, logsErr error
wg.Add(1)
go func() {
defer wg.Done()
// TiDB is great for this kind of fast lookup by primary key
traceMeta, metaErr = s.tidb.GetTraceMetadata(ctx, traceID)
}()
wg.Add(1)
go func() {
defer wg.Done()
// ClickHouse is optimized for scanning large datasets based on an indexed column
traceLogs, logsErr = s.ch.GetLogsByTraceID(ctx, traceID)
}()
wg.Wait()
if metaErr != nil || logsErr != nil {
// In a real system, we'd provide more detailed error info
http.Error(w, "Failed to fetch trace data", http.StatusInternalServerError)
log.Printf("Error fetching trace %s. MetaErr: %v, LogsErr: %v", traceID, metaErr, logsErr)
return
}
// Combine the results from both databases
response := struct {
Metadata *models.TraceMetadata `json:"metadata"`
Logs []*models.UnifiedLog `json:"logs"`
}{
Metadata: traceMeta,
Logs: traceLogs,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// main function to set up the server would be here...
这里的核心是通过goroutine并行查询多个数据源,然后聚合结果。这个模式可以扩展到更复杂的场景,例如:先从TiDB查询符合某个业务条件(如“VIP用户”)的TraceID列表,再用这个列表去ClickHouse中查询这些Trace在过去7天内的所有错误日志。
架构的扩展性与局限性
该混合架构具备良好的扩展性。当指标数据量变得异常庞大时,我们可以引入专门的时序数据库(TSDB)如VictoriaMetrics或M3DB,并在查询联邦层增加一个新的数据源适配器。联邦服务是整个系统的“可插拔”核心,确保了技术栈的灵活性。
然而,这个架构并非没有代价。最显著的局限性在于查询联邦服务的复杂性。我们实际上是在构建一个简化的分布式查询引擎,需要处理跨源JOIN、聚合下推、超时控制和错误处理等问题。这带来了额外的研发和维护成本。此外,维护TiDB和ClickHouse两个分布式集群也对SRE团队提出了更高的要求。数据在两个系统间的最终一致性也需要有监控和校验机制来保障。此方案适用于数据规模确实达到瓶颈、且团队具备相应技术深度的场景,对于中小型项目而言可能过于复杂。