构建PB级可观测性数据后端 TiDB与ClickHouse的混合架构选型与实现


我们的起点是一个棘手的工程问题:当可观测性数据——涵盖日志、追踪和指标——日增量达到TB级别,总存储迈向PB规模时,应该采用何种后端架构?单一的技术栈,无论是纯OLAP引擎还是HTAP数据库,都开始暴露出明显的短板。这不仅仅是存储问题,更是查询性能、数据关联和运营成本之间的艰难权衡。

定义问题:可观测性后端的三大挑战

一个现代化的可观测性平台必须应对三个核心挑战:

  1. 混合查询负载: 系统需要同时支持两种截然不同的查询模式。一种是基于TraceID或UserID的快速点查与小范围扫描,这类似于OLTP负载,要求低延迟响应。另一种则是对海量日志数据进行聚合、筛选和分析,这是典型的OLAP负载,要求高吞吐。
  2. 数据模型的统一与关联: 日志、追踪和指标是三种异构的数据。一个强大的平台必须能够将它们无缝关联。例如,从一个异常的指标图表下钻到相关的Trace,再从Trace定位到具体的错误日志。这就要求底层存储不仅能存储数据,还要能高效地处理跨模型的JOIN操作。
  3. 成本与性能的平衡: PB级别的数据存储和计算成本是无法忽视的。架构方案必须在保证查询性能的前提下,具备极高的压缩率和计算效率,并且能够对冷热数据进行有效分层,以控制长期存储成本。

方案A:纯ClickHouse架构——速度的诱惑与束缚

ClickHouse作为业界顶尖的OLAP引擎,其在日志和指标分析场景下的性能毋庸置疑。初步构想中,一个纯ClickHouse集群似乎是理所当然的选择。

优势分析:

  • 极致的OLAP性能: 基于列式存储和向量化执行引擎,ClickHouse在处理大规模聚合查询时速度极快。对于日志的全文搜索、指标的聚合计算等场景,它几乎是无敌的。
  • 惊人的数据压缩比: 列存的特性加上高效的压缩算法,使得ClickHouse的存储成本相对较低,对于海量日志数据非常友好。

劣势分析:

  • JOIN操作的局限性: ClickHouse的JOIN能力相对较弱,尤其是对于大规模、高基数的分布式JOIN,性能和稳定性都存在挑战。将日志、追踪和元数据(例如服务信息、部署版本)进行关联查询,会变得非常笨拙和低效。
  • 点查和更新的短板: ClickHouse不擅长高并发的点查和数据更新。在我们的场景中,管理告警规则、用户配置、服务元数据等需要频繁更新和查询的信息,用ClickHouse来做会非常痛苦。
  • 事务能力的缺失: 平台自身的管理功能(如租户管理、权限控制)需要事务保证,这是ClickHouse无法提供的。

在真实项目中,强行用ClickHouse处理所有负载,最终会导致系统架构的扭曲。我们不得不引入外部的MySQL或PostgreSQL来管理元数据,从而在应用层逻辑中手动完成数据拼接,这违背了构建统一平台的初衷。

方案B:纯TiDB架构——HTAP的承诺与现实

TiDB作为一款优秀的HTAP(混合事务/分析处理)数据库,理论上可以同时处理OLTP和OLAP负载,这使其成为另一个有力的候选者。

优势分析:

  • 统一的数据存储: TiDB可以同时存储结构化的元数据(OLTP)和海量的可观测性数据(OLAP),提供标准的SQL接口,简化了开发和运维。
  • 强大的事务与一致性: 基于Raft协议,TiDB提供金融级的ACID事务保证,非常适合处理平台的配置管理和元数据。
  • 水平扩展能力: TiDB的存储层(TiKV)和计算层(TiDB Server)都可以独立扩展,能够应对高并发的写入和查询请求。其列存引擎TiFlash为OLAP查询加速。

劣势分析:

  • 极致OLAP性能与成本: 尽管TiFlash提供了不错的OLAP能力,但在面对PB级日志数据的极限聚合分析场景时,其性能和成本效益相较于ClickHouse这样的专用OLAP引擎,仍然存在差距。ClickHouse的物化视图、专用函数库等特性在可观测性领域更为贴合。
  • 写入放大问题: 对于日志这种几乎只有追加写入(Append-Only)的场景,TiDB基于LSM-Tree的存储引擎(TiKV)在后台需要进行频繁的Compaction操作,这在高写入吞吐下可能会带来一定的性能开销和资源消耗。

一个常见的错误是,认为HTAP数据库是解决所有问题的银弹。在我们的规模下,虽然TiDB可以“完成”工作,但可能不是“最优”的选择,尤其是在日志分析的成本和性能上。

最终选择:TiDB + ClickHouse混合架构

权衡之后,我们决定采用一种混合架构,让两款数据库各司其职,发挥其最大优势。

graph TD
    subgraph 数据源
        A[服务A] --> C
        B[服务B] --> C
    end

    subgraph 采集层
        C[OTel Collector] --> D[Kafka]
    end

    subgraph 处理与分发层
        D --> E{Go Ingestion Service}
    end

    subgraph 存储层
        E -- 元数据/热数据/追踪 --> F[TiDB Cluster]
        E -- 日志/指标/冷追踪 --> G[ClickHouse Cluster]
    end
    
    subgraph 查询层
        H[Query Federation Service] --> F
        H --> G
    end

    subgraph 应用层
        I[API / Dashboard] --> H
    end

架构核心思想:

  • TiDB作为“控制平面”与“热数据中心”:

    • 存储所有元数据:服务信息、告警规则、用户配置、租户信息等。
    • 存储近期的、关联查询频繁的追踪数据(例如过去1小时的Traces)。Trace数据结构复杂,关联查询多,TiDB的事务和JOIN能力是完美匹配。
    • 提供强一致性的OLTP服务。
  • ClickHouse作为“数据平面”与“冷/温数据湖”:

    • 存储全量的、海量的结构化日志和指标数据。
    • 存储归档的追踪数据(例如超过1小时的Traces)。
    • 为大规模数据分析和聚合提供极致性能。
  • Query Federation Service(查询联邦服务): 这是整个架构的粘合剂。它是一个自研的后端服务,负责解析来自前端的查询请求,将其拆解成针对TiDB和ClickHouse的子查询,最后合并结果返回。这是整个方案的技术关键点。

核心实现概览

1. 统一的数据“样式方案”:结构化与模型定义

为了让异构数据能够在两个数据库中流转并被统一查询,我们定义了一套严格的数据模型。我们称之为“样式方案”,它本质上是一套跨系统的Schema规范。

以下是在Go中定义的核心日志结构,它将被用于写入Kafka,并最终映射到TiDB和ClickHouse的表中。

// file: internal/models/log.go

package models

import (
	"time"
)

// UnifiedLog represents the standardized structure for all incoming logs.
// This struct serves as the "styling scheme" for our observability data.
// It's designed to be easily mapped to both ClickHouse and TiDB tables.
type UnifiedLog struct {
	Timestamp      time.Time         `json:"timestamp"`       // 精确到纳秒的时间戳
	TraceID        string            `json:"trace_id"`        // 关联追踪的TraceID
	SpanID         string            `json:"span_id"`         // 关联追踪的SpanID
	Severity       string            `json:"severity"`        // 日志级别 (e.g., INFO, ERROR)
	ServiceName    string            `json:"service_name"`    // 服务名称
	HostName       string            `json:"host_name"`       // 主机名
	Body           string            `json:"body"`            // 日志主体消息
	Attributes     map[string]string `json:"attributes"`      // 结构化属性 (Key-Value)
	Resource       map[string]string `json:"resource"`        // 资源属性 (e.g., k8s_pod_name)
}

// ClickHouse table schema for this struct
/*
CREATE TABLE observer.logs (
    `Timestamp` DateTime64(9, 'Asia/Shanghai'),
    `TraceID` String,
    `SpanID` String,
    `Severity` LowCardinality(String),
    `ServiceName` LowCardinality(String),
    `HostName` LowCardinality(String),
    `Body` String,
    `Attributes` Map(String, String),
    `Resource` Map(String, String),
    INDEX idx_trace_id TraceID TYPE bloom_filter,
    INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(Timestamp)
ORDER BY (ServiceName, Severity, Timestamp);
*/

// TiDB would typically store metadata or recent traces, not high-volume logs.
// For example, a trace metadata table might look like this:
/*
CREATE TABLE observer.trace_metadata (
    `trace_id` VARCHAR(36) PRIMARY KEY,
    `service_name` VARCHAR(255),
    `operation_name` VARCHAR(255),
    `start_time` DATETIME(6),
    `duration_ms` INT,
    `has_error` BOOLEAN,
    INDEX idx_service_time (service_name, start_time)
);
*/

这里的坑在于,LowCardinality类型是ClickHouse优化的关键,对于服务名、日志级别这类基数不高的字段,能极大提升性能和压缩率。设计表结构时必须充分考虑目标数据库的特性。

2. 数据写入与分发服务

这个Go服务是数据流的咽喉。它消费Kafka中的原始数据,根据数据类型和时间戳进行路由决策。

// file: cmd/ingester/main.go

package main

import (
    "context"
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/segmentio/kafka-go"
    "your_project/internal/db"
    "your_project/internal/models"
)

const (
    kafkaTopic        = "otel-logs"
    kafkaBroker       = "kafka:9092"
    consumerGroupID   = "log-ingester-group"
    clickhouseBatchSize = 10000
    clickhouseFlushInterval = 5 * time.Second
)

func main() {
    // 生产级的服务需要更健壮的配置管理
    chConn, err := db.NewClickHouseClient("tcp://clickhouse-server:9000?database=observer")
    if err != nil {
        log.Fatalf("Failed to connect to ClickHouse: %v", err)
    }
    defer chConn.Close()
    
    // TiDB client initialization would be here...
    // tidbConn, err := db.NewTiDBClient("user:pass@tcp(tidb-server:4000)/observer")

    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{kafkaBroker},
        Topic:    kafkaTopic,
        GroupID:  consumerGroupID,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
    })
    defer reader.Close()

    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    // Buffer for batching writes to ClickHouse
    logBuffer := make(chan *models.UnifiedLog, clickhouseBatchSize*2)

    wg.Add(1)
    go func() {
        defer wg.Done()
        processKafkaMessages(ctx, reader, logBuffer)
    }()
    
    wg.Add(1)
    go func() {
        defer wg.Done()
        batchInsertToClickHouse(ctx, chConn, logBuffer)
    }()

    // Graceful shutdown
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    <-sigterm
    
    log.Println("Shutdown signal received, draining buffers...")
    cancel() // Notify goroutines to stop
    wg.Wait()
    log.Println("Ingester shut down gracefully.")
}

func processKafkaMessages(ctx context.Context, reader *kafka.Reader, logBuffer chan<- *models.UnifiedLog) {
    for {
        select {
        case <-ctx.Done():
            log.Println("Kafka processor shutting down.")
            return
        default:
            // Use FetchMessage for better control over context cancellation
            m, err := reader.FetchMessage(ctx)
            if err != nil {
                if err == context.Canceled {
                    return
                }
                log.Printf("Error fetching message: %v", err)
                continue
            }

            var logEntry models.UnifiedLog
            if err := json.Unmarshal(m.Value, &logEntry); err != nil {
                log.Printf("Failed to unmarshal log: %v", err)
                // In production, send to a dead-letter queue
                reader.CommitMessages(ctx, m) // Commit even on error to avoid reprocessing
                continue
            }
            
            // 路由逻辑:这里简化为所有日志都进ClickHouse
            // 真实的场景会更复杂,比如根据logEntry.Type判断是trace还是log
            // if isTrace(logEntry) && time.Since(logEntry.Timestamp) < time.Hour {
            //     // Write to TiDB for recent traces
            // } else {
            //     logBuffer <- &logEntry
            // }
            logBuffer <- &logEntry

            if err := reader.CommitMessages(ctx, m); err != nil {
                log.Printf("Failed to commit message: %v", err)
            }
        }
    }
}

func batchInsertToClickHouse(ctx context.Context, conn db.ClickHouseClient, logBuffer <-chan *models.UnifiedLog) {
    ticker := time.NewTicker(clickhouseFlushInterval)
    defer ticker.Stop()
    
    batch := make([]*models.UnifiedLog, 0, clickhouseBatchSize)

    for {
        select {
        case <-ctx.Done():
            log.Println("ClickHouse writer shutting down. Flushing final batch...")
            if len(batch) > 0 {
                flushBatch(context.Background(), conn, batch) // Use a background context for final flush
            }
            return
        case logEntry := <-logBuffer:
            batch = append(batch, logEntry)
            if len(batch) >= clickhouseBatchSize {
                flushBatch(ctx, conn, batch)
                batch = make([]*models.UnifiedLog, 0, clickhouseBatchSize) // Reset batch
            }
        case <-ticker.C:
            if len(batch) > 0 {
                flushBatch(ctx, conn, batch)
                batch = make([]*models.UnifiedLog, 0, clickhouseBatchSize) // Reset batch
            }
        }
    }
}

func flushBatch(ctx context.Context, conn db.ClickHouseClient, batch []*models.UnifiedLog) {
    // 实际的db.ClickHouseClient会封装底层的 batch.Append 和 conn.Send
    // 这里仅为示意
    err := conn.BatchInsertLogs(ctx, batch) 
    if err != nil {
        log.Printf("Failed to insert batch of %d logs to ClickHouse: %v", len(batch), err)
        // Production code needs a robust retry mechanism with backoff,
        // and potentially writing failed batches to a recovery location.
    } else {
        log.Printf("Successfully inserted batch of %d logs.", len(batch))
    }
}

这段代码展示了一个生产级消费者的基本骨架:优雅停机、上下文管理、批量写入和定时刷新。一个常见的错误是在循环中逐条写入数据库,这会对数据库造成巨大压力。批量提交是必须的。

3. 查询联邦服务

这是架构的大脑。它需要解析查询,决定数据源,然后合并结果。一个简化的示例如下,假设我们要查询一个特定TraceID的所有相关日志。

// file: cmd/query-federator/handler.go
package main

import (
    "context"
    "encoding/json"
    "net/http"
    "sync"
    "time"

    "your_project/internal/db"
    "your_project/internal/models"
)

type QueryService struct {
    tidb *db.TiDBClient
    ch   *db.ClickHouseClient
}

// A query for a trace might involve fetching metadata from TiDB and logs from ClickHouse
func (s *QueryService) GetTraceDetails(w http.ResponseWriter, r *http.Request) {
    traceID := r.URL.Query().Get("traceID")
    if traceID == "" {
        http.Error(w, "traceID is required", http.StatusBadRequest)
        return
    }

    ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
    defer cancel()

    var wg sync.WaitGroup
    var traceMeta *models.TraceMetadata
    var traceLogs []*models.UnifiedLog
    var metaErr, logsErr error

    wg.Add(1)
    go func() {
        defer wg.Done()
        // TiDB is great for this kind of fast lookup by primary key
        traceMeta, metaErr = s.tidb.GetTraceMetadata(ctx, traceID)
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        // ClickHouse is optimized for scanning large datasets based on an indexed column
        traceLogs, logsErr = s.ch.GetLogsByTraceID(ctx, traceID)
    }()

    wg.Wait()

    if metaErr != nil || logsErr != nil {
        // In a real system, we'd provide more detailed error info
        http.Error(w, "Failed to fetch trace data", http.StatusInternalServerError)
        log.Printf("Error fetching trace %s. MetaErr: %v, LogsErr: %v", traceID, metaErr, logsErr)
        return
    }

    // Combine the results from both databases
    response := struct {
        Metadata *models.TraceMetadata `json:"metadata"`
        Logs     []*models.UnifiedLog  `json:"logs"`
    }{
        Metadata: traceMeta,
        Logs:     traceLogs,
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// main function to set up the server would be here...

这里的核心是通过goroutine并行查询多个数据源,然后聚合结果。这个模式可以扩展到更复杂的场景,例如:先从TiDB查询符合某个业务条件(如“VIP用户”)的TraceID列表,再用这个列表去ClickHouse中查询这些Trace在过去7天内的所有错误日志。

架构的扩展性与局限性

该混合架构具备良好的扩展性。当指标数据量变得异常庞大时,我们可以引入专门的时序数据库(TSDB)如VictoriaMetrics或M3DB,并在查询联邦层增加一个新的数据源适配器。联邦服务是整个系统的“可插拔”核心,确保了技术栈的灵活性。

然而,这个架构并非没有代价。最显著的局限性在于查询联邦服务的复杂性。我们实际上是在构建一个简化的分布式查询引擎,需要处理跨源JOIN、聚合下推、超时控制和错误处理等问题。这带来了额外的研发和维护成本。此外,维护TiDB和ClickHouse两个分布式集群也对SRE团队提出了更高的要求。数据在两个系统间的最终一致性也需要有监控和校验机制来保障。此方案适用于数据规模确实达到瓶颈、且团队具备相应技术深度的场景,对于中小型项目而言可能过于复杂。


  目录