构建基于 Knative、Quarkus 与 ScyllaDB 的异步高吞吐事件批处理管道


我们面临一个棘手的流量洪峰问题。一个核心业务系统每分钟会产生数百万条状态更新事件,这些事件必须被持久化以供后续分析。最初的架构简单粗暴:事件产生后,通过RPC直接写入数据库。在流量平稳时这套方案工作得很好,但在高峰期,数据库的连接数和事务开销会瞬间被打满,导致整个系统的响应延迟急剧上升,甚至出现雪崩。

我们尝试引入消息队列作为缓冲,但这只是把问题从数据库瞬间的写压力转移到了消费端的处理能力上。消费端服务如果还是来一条消息就写一次数据库,那么数据库的瓶颈依然存在。根本问题在于,单条写入的开销太高,而我们的场景并不要求事件的实时强一致性。真正的需求是:高吞吐、低延迟地接收事件,并以对下游存储最友好的方式进行持久化。

这个问题的本质是读写模式的异步化和批处理化。我们需要一个服务,它能像海绵一样快速吸收事件,然后在内部聚合成批,再以“整存整取”的方式一次性写入 ScyllaDB。这个服务本身还必须是无状态且能够瞬时弹性伸缩的,否则它自己就会成为新的瓶leneck。

技术选型决策

在真实项目中,技术选型从来不是单纯的比拼性能指标,而是对成本、维护性、技术栈契合度的综合考量。

  1. 持久化层:为什么是 ScyllaDB?
    我们评估了 Cassandra 和 ScyllaDB。两者都擅长处理大规模写密集型负载。但 ScyllaDB 基于 C++ 和 Seastar 框架重写,其 thread-per-core 架构避免了 JVM GC 停顿和上下文切换开销,提供了更可预测的低延迟和更高的单节点吞吐。在我们的压测中,同等硬件下 ScyllaDB 的 P99 延迟比 Cassandra 低了近60%。这意味着更少的节点、更低的运维成本。对于这个追求极致写入性能的场景,ScyllaDB 是更优解。

  2. 服务运行时:为什么是 Knative?
    事件流量具有极端的峰谷效应,可能在几分钟内从零飙升到峰值,然后又迅速回落。使用传统的 Kubernetes Deployment + HPA 方案,冷启动和扩容速度跟不上流量脉冲。Knative Serving 提供了请求驱动的自动伸缩,包括缩容至零。这完美契合我们的需求:平时不消耗任何资源,流量来了秒级拉起 N 个实例,流量一过自动回收。Knative Eventing 则提供了一套解耦的事件基础设施,我们可以轻松地将事件源(如 Kafka Topic、Webhook)与我们的处理服务连接起来,而无需在代码中硬编码任何连接逻辑。

  3. 开发框架:为什么是 Quarkus (Java)?
    团队技术栈以 Java 为主,但传统的 Spring Boot 框架对于 Serverless 场景来说过于笨重,启动慢、内存占用高。Quarkus 作为云原生 Java 框架,通过 AOT 编译(GraalVM)和构建时优化,生成启动速度极快、内存占用极低的本地可执行文件。这对于 Knative 缩容至零后的快速冷启动至关重要。更重要的是,Quarkus 内置了强大的响应式编程模型 Mutiny,这为我们实现一个高效的、非阻塞的内存批处理逻辑提供了完美的工具。

核心实现:响应式批处理逻辑

问题的核心在于如何优雅地实现事件的批处理。我们需要一个机制,它能持续接收单个事件,然后在满足“数量达到阈值”或“时间窗口到达”任一条件时,将累积的事件打包触发一次批量写入。

以下是我们的核心处理服务 EventBatchingService.java 的完整实现。

package org.example.pipeline;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 核心的事件批处理服务。
 * 它使用响应式流(Mutiny)来处理事件的聚合与分发。
 * 这个服务是单例的,在整个应用生命周期内持续运行。
 */
@ApplicationScoped
public class EventBatchingService {

    private static final Logger LOG = Logger.getLogger(EventBatchingService.class);

    // 从 application.properties 注入配置
    @ConfigProperty(name = "pipeline.batch.max-size", defaultValue = "1000")
    int maxBatchSize;

    @ConfigProperty(name = "pipeline.batch.max-wait-ms", defaultValue = "500")
    int maxWaitTimeMs;
    
    @Inject
    ScyllaDBWriter scyllaDBWriter;

    // UnicastProcessor 是一个 Mutiny 的 "Hot Stream",可以从外部向其推送数据。
    // 它充当了我们事件流的入口。
    private final UnicastProcessor<TelemetryEvent> eventProcessor = UnicastProcessor.create();
    
    private final AtomicLong receivedCounter = new AtomicLong(0);
    private final AtomicLong processedBatchCounter = new AtomicLong(0);

    /**
     * 在应用启动时,订阅并构建整个响应式处理管道。
     * 这是一个声明式的构建过程,只有当事件流入时,管道才会真正执行。
     */
    void onStart(@Observes StartupEvent ev) {
        LOG.infof("Initializing event batching pipeline with batchSize=%d, maxWaitTime=%dms", maxBatchSize, maxWaitTimeMs);

        eventProcessor
            // 这是一个关键的背压策略。如果下游处理不过来,我们会缓存最多 2 倍批处理大小的事件。
            // 超出部分会根据 OVERFLOW_BUFFER_STRATEGY 策略处理,这里默认是抛出异常。
            // 在真实生产中,可能需要更复杂的策略,例如丢弃或发送到死信队列。
            .onOverflow().buffer(maxBatchSize * 2)
            
            // 核心批处理逻辑:
            // .group().intoLists() 创建一个分组操作。
            // .of(maxBatchSize) 表示当分组内的元素达到 maxBatchSize 时,就将这个分组(一个List)向下游发送。
            // .every(Duration.ofMillis(maxWaitTimeMs)) 表示无论数量是否达到,只要超过了指定时间窗口,也立即发送。
            // 这两个条件的组合确保了低流量时不会无限期等待,高流量时能高效成批。
            .group().intoLists().of(maxBatchSize).every(Duration.ofMillis(maxWaitTimeMs))
            
            // onTermination().call(...) 确保即使在上游流正常关闭时,也能处理掉最后一个未满的批次。
            .onTermination().call(list -> !list.isEmpty() ? handleBatch(list) : io.smallrye.mutiny.Uni.createFrom().voidItem())

            // .emitOn(Infrastructure.getDefaultWorkerPool()) 将批处理操作切换到工作线程池执行。
            // 避免了 I/O 操作(写入ScyllaDB)阻塞事件循环线程。
            .emitOn(Infrastructure.getDefaultWorkerPool())
            
            // 订阅流,并对每个批次执行 handleBatch 方法。
            // .subscribe() 是触发整个流开始处理的动作。
            .subscribe().with(
                this::handleBatch,
                failure -> LOG.error("Critical failure in event pipeline stream. Pipeline terminated.", failure)
            );
    }
    
    /**
     * 外部调用此方法将单个事件推入处理管道。
     * 这个方法必须是线程安全的,并且执行速度极快。
     * @param event 遥测事件
     */
    public void processEvent(TelemetryEvent event) {
        if (event == null || event.getDeviceId() == null) {
            LOG.warn("Received null or invalid event. Skipping.");
            return;
        }
        // .onNext() 是向 Hot Stream 推送数据的方法。
        eventProcessor.onNext(event);
        receivedCounter.incrementAndGet();
    }

    /**
     * 处理一个批次的事件。这里会调用 ScyllaDBWriter 进行持久化。
     * @param batch 一个事件列表
     */
    private io.smallrye.mutiny.Uni<Void> handleBatch(List<TelemetryEvent> batch) {
        if (batch.isEmpty()) {
            return io.smallrye.mutiny.Uni.createFrom().voidItem();
        }

        long batchNumber = processedBatchCounter.incrementAndGet();
        LOG.infof("Processing batch #%d with %d events.", batchNumber, batch.size());
        
        // 调用 ScyllaDBWriter 的异步写入方法。
        // 返回一个 Uni<Void> 代表异步操作的结果。
        return scyllaDBWriter.writeBatchAsync(batch)
            .onFailure().invoke(failure -> LOG.errorf(failure, "Failed to write batch #%d to ScyllaDB.", batchNumber))
            .onFailure().retry().withBackoff(Duration.ofMillis(100), Duration.ofSeconds(1)).atMost(3)
            .onFailure().recoverWithItem(() -> {
                // 如果重试3次后仍然失败,我们记录严重错误,并可以选择将数据推送到死信队列。
                // 这里为了简化,只打印日志。
                LOG.errorf("Batch #%d failed after 3 retries. Events in this batch might be lost.", batchNumber);
                return null; // 恢复流,使其不被终止
            });
    }

    // 用于监控的简单 getter 方法
    public long getReceivedCount() {
        return receivedCounter.get();
    }
}

这个服务的精髓在于 onStart 方法中对响应式流的声明式构建。代码可读性强,且完美地描述了我们的业务逻辑:将事件流聚合成列表流,聚合规则是数量或时间。所有操作都是非阻塞的,能够最大化利用 CPU 资源。

ScyllaDB 写入器与性能优化

仅仅有批处理逻辑是不够的,写入数据库的方式同样重要。对 ScyllaDB 这种高性能数据库,客户端的写入方式直接决定了能否发挥出其全部潜力。

ScyllaDBWriter.java 负责与数据库交互:

package org.example.pipeline;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.jboss.logging.Logger;

import java.util.List;
import java.util.stream.Collectors;

@ApplicationScoped
public class ScyllaDBWriter {

    private static final Logger LOG = Logger.getLogger(ScyllaDBWriter.class);

    @Inject
    CqlSession session;

    private PreparedStatement insertStatement;

    // CQL 语句。在真实项目中,keyspace 和 table 名称应该来自配置。
    private static final String INSERT_CQL = "INSERT INTO telemetry_keyspace.device_events (device_id, event_time, payload_json) VALUES (?, ?, ?)";

    /**
     * PostConstruct 注解确保在注入 CqlSession 后,立即准备 PreparedStatement。
     * PreparedStatement 只编译一次,后续可以高效复用,这是 CQL 性能优化的第一步。
     */
    @PostConstruct
    void initialize() {
        LOG.info("Preparing ScyllaDB insert statement...");
        try {
            // Keyspace 和 Table 的创建逻辑。在生产环境中,这通常由数据库迁移工具(如 Liquibase)管理。
            // 这里为了演示的完整性,将其包含进来。
            session.execute(SimpleStatement.builder("CREATE KEYSPACE IF NOT EXISTS telemetry_keyspace WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").build());
            session.execute(SimpleStatement.builder("CREATE TABLE IF NOT EXISTS telemetry_keyspace.device_events (device_id text, event_time timestamp, payload_json text, PRIMARY KEY (device_id, event_time));").build());
            
            this.insertStatement = session.prepare(INSERT_CQL);
            LOG.info("ScyllaDB insert statement prepared successfully.");
        } catch (Exception e) {
            LOG.error("Failed to prepare ScyllaDB statement. Application might not function correctly.", e);
            // 在启动时失败,应该让应用启动失败。
            throw new RuntimeException(e);
        }
    }

    /**
     * 异步写入一个批次的事件。
     * @param batch 事件列表
     * @return Uni<Void> 代表异步操作完成。
     */
    public Uni<Void> writeBatchAsync(List<TelemetryEvent> batch) {
        if (batch == null || batch.isEmpty()) {
            return Uni.createFrom().voidItem();
        }

        // 关键优化点:使用 UNLOGGED BATCH。
        // 对于需要高性能导入大量数据的场景,UNLOGGED BATCH 可以显著减少协调器的开销。
        // 它绕过了 batchlog 机制,代价是原子性不再保证(部分语句可能成功,部分失败)。
        // 在我们的场景中,事件是独立的,批次的原子性不重要,性能是首要目标。
        BatchStatementBuilder batchBuilder = new BatchStatementBuilder(BatchType.UNLOGGED);

        // 将每个事件绑定到 PreparedStatement 并添加到批次中。
        batch.forEach(event -> 
            batchBuilder.addStatement(
                insertStatement.bind(event.getDeviceId(), event.getEventTime().toInstant(), event.getPayloadJson())
            )
        );

        BatchStatement batchStatement = batchBuilder.build();
        
        // session.executeReactive 返回一个 Mutiny 的 Uni,完美融入我们的响应式管道。
        return Uni.createFrom().completionStage(session.executeAsync(batchStatement))
                .onItem().ignore().andSwitchTo(Uni.createFrom().voidItem())
                .emitOn(Infrastructure.getDefaultWorkerPool()); // 确保后续操作在工作线程上
    }
}

这里的关键优化是使用了 UNLOGGED BATCH。在 Cassandra/ScyllaDB 中,默认的 LOGGED BATCH 是为了保证一个批次内所有操作的原子性,这会带来额外的性能开销。而我们的场景是批量导入独立的遥测数据,即使批次中某几条写入失败,也不会影响其他数据。使用 UNLOGGED BATCH 可以跳过保证原子性的协调过程,将批次中的语句直接分发到对应的副本节点,极大提升写入吞吐。

对外暴露 API 与 Knative 集成

服务需要一个入口来接收事件。在 Knative Eventing 中,事件最终会以 HTTP POST 请求的形式发送到我们的服务。因此,我们创建一个简单的 JAX-RS Endpoint。

package org.example.pipeline;

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.jboss.logging.Logger;

@Path("/")
public class EventReceiverResource {

    private static final Logger LOG = Logger.getLogger(EventReceiverResource.class);

    @Inject
    EventBatchingService batchingService;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.TEXT_PLAIN)
    public Response receiveEvent(TelemetryEvent event) {
        // Knative Eventing 要求服务在接收到事件后快速响应 2xx 状态码。
        // 我们的 processEvent 方法是异步的,会立刻返回,不会阻塞 HTTP 请求线程。
        // 这满足了 Knative 的要求。
        try {
            batchingService.processEvent(event);
            return Response.accepted().entity("Event accepted.").build();
        } catch (Exception e) {
            // 这里的异常通常是队列满了(背压),返回 503 Service Unavailable 是合适的。
            // Knative Eventing 的 Broker 会根据配置进行重试。
            LOG.error("Failed to accept event due to internal queue full or other error.", e);
            return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("Service busy, please retry.").build();
        }
    }
}

该端点非常轻量,它唯一的工作就是将接收到的事件快速丢给 EventBatchingService 的处理队列,然后立即返回 202 Accepted。这种“收下即走”的模式对于高吞吐事件接收至关重要。

Knative 部署清单

最后一步是部署。我们需要两个 Knative 清单:一个是 Service,用于定义我们的应用容器和伸缩规则;另一个是 Trigger,用于订阅事件并将其路由到我们的服务。

service.yaml:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-aggregator-service
spec:
  template:
    metadata:
      annotations:
        # 关键的伸缩配置
        autoscaling.knative.dev/window: "20s"       # 稳定窗口,防止因流量抖动频繁扩缩容
        autoscaling.knative.dev/scale-down-delay: "60s" # 流量停止后等待60秒再缩容
        autoscaling.knative.dev/target: "100"       # 每个 Pod 实例的目标并发请求数
        autoscaling.knative.dev/min-scale: "0"      # 允许缩容至零
        autoscaling.knative.dev/max-scale: "20"     # 最大实例数,防止失控
    spec:
      containers:
        - image: your-registry/event-aggregator:latest # 替换为你的镜像
          ports:
            - containerPort: 8080
          env:
            # ScyllaDB 连接信息等配置应通过环境变量或 Secret 注入
            - name: QUARKUS_DATASOURCE_CQL_HOSTS
              value: "scylla-service:9042"
            - name: QUARKUS_DATASOURCE_CQL_LOCAL_DATACENTER
              value: "datacenter1"
          resources:
            requests:
              memory: "128Mi"
              cpu: "250m"
            limits:
              memory: "256Mi"
              cpu: "1"

trigger.yaml:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: aggregator-trigger
spec:
  # 假设环境中已存在一个名为 'default' 的 Broker
  broker: default
  # 可以设置过滤条件,只接收特定类型的事件
  # filter:
  #   attributes:
  #     type: com.example.telemetry
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-aggregator-service

通过这两个简单的 YAML 文件,我们就将整个高弹性的事件处理管道部署到了 Kubernetes 集群中。

graph TD
    A[外部事件源] -->|发送事件| B(Knative Broker);
    B -->|CloudEvent| C(Knative Trigger);
    C -->|HTTP POST| D{Event Aggregator Service};
    subgraph Knative Serving
        D -- scale 0..N --> D1[Pod 1];
        D -- ... --> Dn[Pod N];
    end
    subgraph Pod 内部
        D1 -->|1. JAX-RS Endpoint| E1[EventBatchingService];
        E1 -->|2. Mutiny Pipeline| F1(内存批处理);
        F1 -->|3. ScyllaDBWriter| G1[UNLOGGED BATCH];
    end
    G1 --> H((ScyllaDB Cluster));

局限性与未来迭代路径

当前这套方案并非银弹。它的核心设计取舍是用一定的风险换取极致的性能和成本效益。

首先,内存批处理意味着在 Pod 崩溃或被强制重启时,正在聚合但尚未写入数据库的那个批次的数据会丢失。对于我们的遥测场景,这种微小的数据丢失是可以接受的。但如果业务要求更严格的 at-least-once 交付保证,那么就需要在 EventBatchingService 之前引入一个持久化的队列,例如 Kafka。此时,我们的服务角色就从“接收并聚合”变成了“消费并聚合”,Knative 的角色则可以通过 KEDA(Kubernetes-based Event Driven Autoscaling)与 Kafka 的消费延迟进行集成,实现更精准的伸缩。

其次,当前的背压机制相对简单,依赖于 Knative 的请求队列和我们代码中的 onOverflow().buffer()。在极端流量冲击下,如果 ScyllaDB 出现写入延迟,可能会导致内存 buffer 耗尽。更健壮的方案可以引入动态调整批处理大小和等待时间的逻辑,或者实现一个更明确的、能向上游传递的背压信号。

最后,虽然我们使用了 UNLOGGED BATCH 提升性能,但这要求分区键(device_id)分布足够均匀。如果出现热点设备,大量事件涌向 ScyllaDB 的同一个分区,依然可能造成瓶颈。对此,需要对数据进行监控,并可能在业务层面或通过更复杂的分区策略来解决数据倾斜问题。


  目录