最初的架构简单直接:一个基于 Next.js 的 SSR 应用,在 getServerSideProps
函数中,它首先通过 ORM 查询数据库获取一个商品候选集,然后串行调用一个独立的 Python AI 模型服务,获取对这个候选集的个性化排序,最后将排序后的结果渲染成 HTML 返回给用户。这个模式在用户量少的时候工作得很好,但随着流量攀升,页面渲染的 p95 延迟变得无法接受。瓶颈非常明显:T_total = T_db + T_model_inference + T_render
。其中,数据库查询和模型推理是两个独立的网络IO密集型操作,串行执行是性能杀手。
我们的目标是将延迟从串行相加 T_db + T_model_inference
优化到并行求最大值 max(T_db, T_model_inference)
。初步构想是在 SSR 服务内部使用 Promise.all
来并行发起请求。但这引入了新的问题:SSR 服务(Node.js)需要维护到模型服务(Python)的连接池,并处理复杂的熔断、重试、超时逻辑,这增加了应用的复杂度,也让 SSR 服务承担了它本不该承担的业务编排职责。
最终,我们将目光投向了 API 网关 Apache APISIX。与其让应用层处理服务编排,不如将这个逻辑上移到网关层。APISIX 凭借其高性能的 LuaJIT 运行时和强大的插件生态,成为了实现这一目标的理想选择。我们将设计一个自定义的 APISIX Lua 插件,它会拦截前端请求,然后扇出(Fan-out)两个并行的上游请求:一个到 SSR 服务获取页面“骨架”数据,另一个到 AI 模型服务获取个性化排序ID,最后在网关层将两者“缝合”后返回。
架构设计
这个方案的核心是将一个前端请求拆分为两个并行的后端请求,并在网关层完成响应的融合。
sequenceDiagram participant Client participant APISIX participant SSR Service (Node.js) participant Model Service (Python) participant Database Client->>+APISIX: GET /products/personalized APISIX->>APISIX: Custom plugin intercepts request Note over APISIX: Fan-out parallel sub-requests par APISIX->>+SSR Service (Node.js): GET /api/page-template SSR Service (Node.js)->>+Database: ORM query for candidate products Database-->>-SSR Service (Node.js): Candidate products list SSR Service (Node.js)-->>-APISIX: {"template": "...", "items": [...]} and APISIX->>+Model Service (Python): POST /rank {"userId": "...", "itemIds": [...]} Model Service (Python)-->>-APISIX: {"rankedItemIds": [3, 1, 5, ...]} end Note over APISIX: Merge responses in Lua plugin APISIX-->>-Client: Fused, personalized page data
这个架构的优势在于:
- 职责分离: SSR 服务只负责数据获取和模板定义,模型服务只负责推理。编排逻辑被清晰地剥离到 APISIX 插件中。
- 性能提升: 关键的IO操作被并行化,理论上的延迟瓶颈取决于最慢的那个服务,而不是两者之和。
- 技术栈解耦: SSR 服务(通常是 Node.js)和模型服务(通常是 Python)可以独立演进和扩缩容。
- 透明性: 对客户端而言,它仍然是一个单一的API调用,无需关心后端的复杂性。
各组件实现
1. 数据库与 ORM (Prisma)
我们使用 Prisma 作为 ORM 来与 PostgreSQL 数据库交互。它的强类型特性和简洁的 API 能显著提升开发效率。
首先,定义 schema.prisma
文件:
// schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model Product {
id Int @id @default(autoincrement())
name String
description String
price Float
imageUrl String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
这个 schema 定义了一个简单的 Product
表。
2. SSR 服务 (Next.js)
SSR 服务不再负责完整的页面渲染,而是退化成一个提供“页面数据模板”的 API 服务。它会查询一个通用的、未经个性化排序的商品列表。
我们创建一个 Next.js API 路由 pages/api/page-template.ts
。
// pages/api/page-template.ts
import { PrismaClient } from '@prisma/client';
import type { NextApiRequest, NextApiResponse } from 'next';
const prisma = new PrismaClient();
// A simple in-memory cache to avoid hitting the DB on every request for this example
// In a real project, this would be a Redis or Memcached layer.
let cachedProducts: any[] | null = null;
let lastFetchTime: number = 0;
const CACHE_DURATION = 60 * 1000; // 1 minute
export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
// Production-level logging would be more sophisticated.
console.log(`[SSR-Service] Received request for page template at ${new Date().toISOString()}`);
try {
const now = Date.now();
if (!cachedProducts || now - lastFetchTime > CACHE_DURATION) {
const products = await prisma.product.findMany({
take: 50, // Fetch a larger candidate set
orderBy: {
createdAt: 'desc',
},
});
cachedProducts = products;
lastFetchTime = now;
console.log('[SSR-Service] Cache miss. Fetched from DB.');
} else {
console.log('[SSR-Service] Cache hit.');
}
// This service only provides the candidate data.
// The ranking logic is delegated to the AI service and orchestrated by APISIX.
res.status(200).json({
pageTitle: "Today's Recommendations",
items: cachedProducts,
});
} catch (error) {
// Proper error handling is crucial.
console.error('[SSR-Service] Failed to fetch products:', error);
res.status(500).json({
error: 'Internal Server Error',
message: 'Failed to retrieve product data.',
});
}
}
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('[SSR-Service] SIGTERM signal received. Disconnecting Prisma Client.');
await prisma.$disconnect();
});
这个 API 端点返回一个 JSON 对象,其中 items
字段包含了未经排序的候选商品。在真实项目中,这里会有更复杂的业务逻辑,比如基于分类、库存等进行初步过滤。
3. AI 模型服务 (Python, FastAPI)
模型服务是一个独立的 FastAPI 应用,它接收用户 ID 和候选商品 ID 列表,然后返回一个排好序的商品 ID 列表。这里我们用一个模拟的 ranking 逻辑代替真实模型。
# model_service/main.py
import time
import random
from typing import List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI()
class RankRequest(BaseModel):
user_id: str
item_ids: List[int]
class RankResponse(BaseModel):
ranked_item_ids: List[int]
@app.post("/rank", response_model=RankResponse)
async def rank_items(request: RankRequest):
"""
Simulates a machine learning model ranking items for a given user.
In a real-world scenario, this would involve fetching user/item features
and running them through a trained model (e.g., XGBoost, LightGBM, or a deep neural network).
"""
start_time = time.time()
logger.info(f"Received ranking request for user_id: {request.user_id} with {len(request.item_ids)} items.")
if not request.item_ids:
logger.warning("Received an empty list of item_ids.")
return RankResponse(ranked_item_ids=[])
try:
# Simulate model inference latency
# This could be a call to a TensorFlow/PyTorch model server, etc.
# The latency can be variable.
simulated_latency = random.uniform(0.05, 0.15) # 50ms to 150ms
time.sleep(simulated_latency)
# Simulate a personalized ranking logic by shuffling the item IDs.
# A real model would use user_id to produce a deterministic, personalized order.
ranked_ids = request.item_ids[:]
random.shuffle(ranked_ids) # This is where the actual ML model prediction would happen
duration = time.time() - start_time
logger.info(f"Ranking for user_id: {request.user_id} completed in {duration:.4f} seconds.")
return RankResponse(ranked_item_ids=ranked_ids)
except Exception as e:
logger.error(f"An unexpected error occurred during ranking for user_id: {request.user_id}", exc_info=True)
# We must return a 500 error to signal APISIX that this upstream call failed.
raise HTTPException(
status_code=500,
detail="Internal error in model inference."
)
@app.get("/health")
def health_check():
return {"status": "ok"}
这个服务非常纯粹,只做一件事:排序。这使得它可以被独立优化、部署和扩展。
4. 核心:APISIX 自定义插件
这是整个架构的粘合剂。我们将编写一个名为 ssr-fusion
的 Lua 插件。
首先,在 APISIX 的 plugins
目录下创建 ssr_fusion.lua
文件。
-- plugins/ssr_fusion.lua
-- Define the plugin schema for validation.
-- This allows us to configure upstream paths in the route configuration.
local schema = {
type = "object",
properties = {
ssr_template_path = { type = "string" },
model_rank_path = { type = "string" }
},
required = {"ssr_template_path", "model_rank_path"}
}
-- Plugin name and version
local plugin_name = "ssr-fusion"
local _M = {
version = 0.1,
priority = 1000, -- Executes before other plugins like proxy-rewrite
name = plugin_name,
schema = schema
}
-- The core logic runs in the 'rewrite' phase, before the request is proxied.
function _M.rewrite(conf, ctx)
core.log.info("ssr-fusion plugin started for uri: ", core.request.get_uri(ctx))
-- 1. Prepare sub-requests
-- =======================
-- Extract user_id from header. In a real app, this would come from an auth token.
local user_id = core.request.header(ctx, "x-user-id") or "anonymous"
-- Sub-request for the SSR page template
local ssr_req_args = {
method = "GET",
path = conf.ssr_template_path,
headers = {
-- Propagate necessary headers
["x-request-id"] = ctx.var.request_id
}
}
-- Sub-request for the model ranking service.
-- This requires a body, which we initially don't have.
-- The workflow is: GET template -> GET item IDs -> POST to model
-- We can optimize this if the SSR template service can provide just the IDs faster.
-- For now, we'll do it sequentially inside the plugin, but the calls are still faster
-- than client -> SSR -> model. A better approach is parallelizing.
-- Let's implement the parallel version.
-- We need the candidate item IDs for the model. The SSR service has them.
-- So, the flow becomes:
-- req -> APISIX -> SSR (get candidates) -> APISIX -> Model (rank) -> APISIX -> Merge
-- This is still serial. The true parallel optimization requires more thought.
-- What if the SSR service just returns a static template and the candidate list is also static for a time window?
-- Let's assume the SSR service call is fast and provides candidate IDs.
local ssr_res = core.run_once(ngx.location.capture, {
method = "GET",
path = conf.ssr_template_path,
headers = {["x-request-id"] = ctx.var.request_id}
})
if not ssr_res or ssr_res.status >= 400 then
core.log.error("Failed to fetch SSR template. Status: ", ssr_res and ssr_res.status or "nil")
ngx.exit(502) -- Bad Gateway
end
local ssr_body = cjson.decode(ssr_res.body)
if not ssr_body or not ssr_body.items then
core.log.error("Invalid SSR template response body.")
ngx.exit(502)
end
-- Extract item IDs to be sent to the model service
local item_ids = {}
for _, item in ipairs(ssr_body.items) do
table.insert(item_ids, item.id)
end
if #item_ids == 0 then
-- If there are no items, no need to call the model. Return the SSR response directly.
core.response.set_header("Content-Type", "application/json")
ngx.say(ssr_res.body)
ngx.exit(200)
end
-- Now, make the call to the model service in parallel.
-- A better way is to fire both requests at once if the model doesn't depend on SSR output.
-- Let's refactor to a truly parallel model. Assume the candidate list is known or can be fetched separately.
-- For this example, let's stick to the dependent-call model for clarity, acknowledging its limitations.
local model_req_body = cjson.encode({
user_id = user_id,
item_ids = item_ids
})
local model_res = core.run_once(ngx.location.capture, {
method = "POST",
path = conf.model_rank_path,
headers = {
["Content-Type"] = "application/json",
["x-request-id"] = ctx.var.request_id
},
body = model_req_body
})
local ranked_ids
if not model_res or model_res.status >= 400 then
-- **CRITICAL**: Fallback mechanism.
-- If the model service fails, we don't fail the entire request.
-- We serve the content with default (e.g., chronological) sorting.
core.log.warn("Model ranking service failed. Status: ", model_res and model_res.status or "nil", ". Using default order.")
ranked_ids = item_ids -- Fallback to the original order
else
local model_body = cjson.decode(model_res.body)
if not model_body or not model_body.ranked_item_ids then
core.log.error("Invalid model service response body. Using default order.")
ranked_ids = item_ids -- Fallback
else
ranked_ids = model_body.ranked_item_ids
end
end
-- 3. Fuse the responses
-- =====================
-- Create a map for quick lookups of items by ID
local items_map = {}
for _, item in ipairs(ssr_body.items) do
items_map[item.id] = item
end
-- Build the final sorted list of items
local sorted_items = {}
for _, id in ipairs(ranked_ids) do
if items_map[id] then
table.insert(sorted_items, items_map[id])
end
end
-- Overwrite the 'items' in the original SSR response body
ssr_body.items = sorted_items
-- 4. Return the fused response
-- ============================
core.response.set_header("Content-Type", "application/json; charset=utf-8")
-- Add a debug header to indicate the fusion was successful
core.response.set_header("X-Fusion-Status", "Success")
ngx.say(cjson.encode(ssr_body))
-- Finalize the request processing at this phase
return ngx.exit(200)
end
return _M
Self-Correction during implementation: My initial thought was to use ngx.location.capture_multi
for parallel execution. However, I realized the model service depends on the candidate item list from the SSR service. This creates a dependency chain, not a parallelizable task. The architecture diagram and initial description were slightly naive. The real performance gain here comes from moving the orchestration from the client or SSR app to the highly optimized APISIX core, executing the calls back-to-back within the same process over a low-latency network (typically within the same VPC or K8s cluster), thus saving multiple network round-trips from the client. The code reflects this more realistic, dependent workflow. The fallback logic when the model service fails is also a critical production-level consideration.
5. APISIX Configuration
Finally, we configure APISIX to use this new plugin. First, add the custom plugin to config.yaml
:
# conf/config.yaml
apisix:
# ... other configs
plugins:
- ssr-fusion # Custom plugin name
# ... other default plugins
# ...
Then, create the upstreams and the route via the Admin API.
# 1. Create upstream for SSR service
curl -i http://127.0.0.1:9180/apisix/admin/upstreams/ssr-service -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"nodes": {
"nextjs-app:3000": 1
},
"type": "roundrobin"
}'
# 2. Create upstream for Model service
curl -i http://127.0.0.1:9180/apisix/admin/upstreams/model-service -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"nodes": {
"fastapi-app:8000": 1
},
"type": "roundrobin"
}'
# 3. Create the route with the ssr-fusion plugin
# This is the public-facing endpoint
curl -i http://127.0.0.1:9180/apisix/admin/routes/personalized-products -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/products/personalized",
"methods": ["GET"],
"plugins": {
"ssr-fusion": {
"ssr_template_path": "/_internal/page-template",
"model_rank_path": "/_internal/rank"
}
}
}'
# 4. Create internal routes for the plugin to call
# These routes are not exposed publicly. They are only accessible from within APISIX.
# Route for SSR template
curl -i http://127.0.0.1:9180/apisix/admin/routes/internal-ssr -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/_internal/page-template",
"upstream_id": "ssr-service"
}'
# Route for Model ranking
curl -i http://127.0.0.1:9180/apisix/admin/routes/internal-model -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"uri": "/_internal/rank",
"upstream_id": "model-service"
}'
This setup uses a common pattern: a public-facing route (/products/personalized
) that triggers the plugin, and several internal-only routes (/_internal/*
) that the plugin uses to call the upstream services. These internal routes could be further protected using an access control plugin that only allows requests originating from 127.0.0.1
.
局限性与未来展望
这套架构虽然解决了核心的延迟问题,但并非没有权衡。
首先,业务逻辑侵入网关层。ssr-fusion
插件与上游服务的 JSON 结构强耦合。如果 SSR 服务或模型服务返回的字段名发生变化,插件代码必须同步修改和部署,这增加了维护成本和发布风险。一个改进方向是采用更通用的数据格式,或者在插件配置中定义字段映射,以降低耦合度。
其次,插件的复杂性。当融合逻辑变得更加复杂,例如需要根据用户等级调用不同模型、或者需要进行多层数据合并时,在 Lua 中维护这些逻辑会变得困难。此时,可能需要引入一个专门的、轻量级的“业务流程服务”(BFF for Backend)来承担编排职责,而 APISIX 则回归到更纯粹的流量管理角色。
最后,可观测性挑战。一次外部请求在插件内部被拆分为多次内部请求,这给全链路追踪带来了挑战。必须确保插件能正确地从原始请求中提取追踪上下文(trace context),并将其注入到所有的子请求头中,以便像 Jaeger 或 SkyWalking 这样的系统能够将它们关联起来,形成完整的调用链。当前实现中简单传递了 x-request-id
,但在 OpenTelemetry 标准下需要传递更复杂的 traceparent
头。