团队的机器学习平台最近遇到了一个典型的“数据孤岛”问题。数据科学家们使用 Spark 在数据湖上构建了一套基于 Apache Hudi 的 Feature Store,工作流顺畅,模型迭代效率很高。Hudi 的 Copy-on-Write 和 Merge-on-Read 表类型为他们提供了近乎实时的数据更新和快照查询能力,非常适合特征工程。但问题在于,这些宝贵的特征数据被封闭在了大数据生态里。业务分析师、应用开发者甚至一些产品经理,都希望能有一个直观、快速的方式来探索、验证这些特征,而不是每次都需要数据科学家写一段 Spark SQL 导出 CSV。
最初的构想很简单:做一个 Web UI。但技术选型很快就暴露了挑战。Web 应用(我们偏好 Next.js/Node.js 技术栈)如何与一个基于 JVM、存储在 S3 上的 Hudi 数据湖进行高效、安全地交互?直接从 Node.js 服务连接 Spark Thrift Server 或者 Hive Metastore 过于笨重,且延迟对于一个交互式 Web 应用是致命的。
经过几轮讨论和原型验证,我们确定了最终架构:在 Web 前端和数据湖之间引入一个轻量级的“Feature Store 网关服务”。这个服务将作为翻译官和安全屏障,负责将前端的 HTTP 请求转换为对数据湖的 SQL 查询,并对结果进行缓存和格式化。
graph TD A[用户/浏览器] --> B(Next.js 前端); B --> C{Node.js Feature Store 网关}; C --> D[Trino 查询引擎]; D --> E[Hive Metastore]; D --> F[S3 上的 Hudi 数据]; subgraph "Web 应用层" B C end subgraph "数据湖查询层" D E end subgraph "存储层" F end C -- "查询缓存" --> G(Redis);
这个架构的核心是 Trino (前身为 PrestoSQL)。选择 Trino 的理由很明确:它被设计用于对异构数据源进行快速的交互式分析,其 Hudi Connector 性能优异,并且提供了标准的 JDBC/ODBC 接口,更重要的是,有成熟的 Node.js 客户端库。Node.js 服务通过 Trino 查询 Hudi 数据,Next.js 则与 Node.js 服务通过 REST API 通信。这样,复杂的数据湖交互被完全封装在了网关服务内部。
第一步:搭建 Node.js 网关服务
我们选择 Fastify 框架,因为它以高性能和低开销著称,非常适合构建 API 网关。核心依赖是 trino-client
用于和 Trino 集群通信。
项目结构初步设定如下:
feature-store-gateway/
├── src/
│ ├── app.ts # Fastify 应用入口
│ ├── routes/
│ │ └── features.ts # 特征相关的 API 路由
│ ├── services/
│ │ ├── trino.ts # Trino 客户端封装
│ │ └── cache.ts # Redis 缓存服务
│ └── utils/
│ └── logger.ts # 日志配置
├── package.json
└── tsconfig.json
首先是 Trino 服务的封装。在真实项目中,配置必须外部化,不能硬编码。这里使用环境变量进行管理,并包含必要的错误处理和连接池思想。
src/services/trino.ts
import { Trino, BasicAuth } from 'trino-client';
import pino from 'pino';
const logger = pino({ name: 'TrinoService' });
// 从环境变量中读取配置,这是生产实践的基础
const TRINO_HOST = process.env.TRINO_HOST || 'http://localhost:8080';
const TRINO_USER = process.env.TRINO_USER || 'admin';
const TRINO_CATALOG = process.env.TRINO_CATALOG || 'hudi';
const TRINO_SCHEMA = process.env.TRINO_SCHEMA || 'feature_store';
// 初始化 Trino 客户端实例
// 这里的配置应该包含超时、重试等生产级参数
const trinoClient: Trino = Trino.create({
server: TRINO_HOST,
auth: new BasicAuth(TRINO_USER),
catalog: TRINO_CATALOG,
schema: TRINO_SCHEMA,
// 关键:为查询设置一个合理的超时,防止慢查询拖垮整个网关
requestTimeout: '30s',
});
/**
* 执行一个只读的 Trino 查询。
* 这是一个通用的查询执行器,包含了基本的日志和错误处理。
* @param sql - 要执行的 SQL 语句
* @returns 查询结果数组
*/
export async function executeQuery<T>(sql: string): Promise<T[]> {
logger.info({ sql }, 'Executing Trino query');
const start = Date.now();
try {
const iterator = await trinoClient.query(sql);
const rows: T[] = [];
for await (const row of iterator) {
// trino-client 返回的是数组,我们需要将其转换为对象
// @ts-ignore - Assuming iterator yields results with `columns` and `data`
const columns = row.columns?.map(col => col.name) || [];
const data = row.data || [];
const rowObject = columns.reduce((obj, key, index) => {
obj[key] = data[index];
return obj;
}, {} as Record<string, any>);
rows.push(rowObject as T);
}
const duration = Date.now() - start;
logger.info({ sql, duration, rowCount: rows.length }, 'Trino query executed successfully');
return rows;
} catch (error: any) {
const duration = Date.now() - start;
logger.error(
{ sql, duration, error: error.message, stack: error.stack },
'Trino query failed'
);
// 向上抛出经过封装的错误,而不是原始的客户端错误
throw new Error(`Failed to execute query on Trino: ${error.message}`);
}
}
/**
* 获取所有特征组(在 Hudi 中表现为表)的列表。
* @returns 特征组名称数组
*/
export async function getFeatureGroups(): Promise<string[]> {
const sql = `SHOW TABLES FROM ${TRINO_SCHEMA}`;
const results = await executeQuery<{ Table: string }>(sql);
return results.map(row => row.Table);
}
/**
* 获取特定特征组的元数据(列名和类型)。
* @param groupName - 特征组名称
* @returns 字段元数据数组
*/
export async function getFeatureGroupSchema(groupName: string): Promise<any[]> {
// 注意:这里的 groupName 必须经过校验,防止 SQL 注入。
// 尽管 Trino 客户端可能处理了参数化,但在应用层进行校验是最佳实践。
if (!/^[a-zA-Z0-9_]+$/.test(groupName)) {
throw new Error('Invalid feature group name.');
}
const sql = `DESCRIBE ${TRINO_SCHEMA}.${groupName}`;
return await executeQuery(sql);
}
/**
* 获取特征组的样本数据。
* @param groupName - 特征组名称
* @param limit - 返回的行数
* @returns 样本数据
*/
export async function getFeatureGroupSample(groupName: string, limit: number = 50): Promise<any[]> {
if (!/^[a-zA-Z0-9_]+$/.test(groupName)) {
throw new Error('Invalid feature group name.');
}
const sql = `SELECT * FROM ${TRINO_SCHEMA}.${groupName} LIMIT ${limit}`;
return await executeQuery(sql);
}
接下来是 API 路由的定义。我们使用 Fastify 的 schema validation 功能,这不仅能保证入参的正确性,还能自动生成 Swagger/OpenAPI 文档,非常实用。
src/routes/features.ts
import { FastifyInstance, FastifyPluginOptions, FastifyRequest } from 'fastify';
import {
getFeatureGroups,
getFeatureGroupSchema,
getFeatureGroupSample,
} from '../services/trino';
import { getFromCache, setInCache } from '../services/cache';
// 定义 API 的 schema 以便进行验证和序列化
const featureGroupParamsSchema = {
type: 'object',
properties: {
groupName: { type: 'string', pattern: '^[a-zA-Z0-9_]+$' },
},
required: ['groupName'],
};
const sampleQuerySchema = {
type: 'object',
properties: {
limit: { type: 'integer', minimum: 1, maximum: 200, default: 50 },
},
};
export function featureRoutes(
fastify: FastifyInstance,
options: FastifyPluginOptions,
done: () => void
) {
// 获取所有特征组列表
fastify.get('/feature-groups', async (request, reply) => {
const cacheKey = 'feature-groups:list';
const cachedData = await getFromCache<string[]>(cacheKey);
if (cachedData) {
return reply.send(cachedData);
}
const data = await getFeatureGroups();
// 缓存特征组列表 5 分钟,因为这通常不频繁变动
await setInCache(cacheKey, data, 300);
return reply.send(data);
});
// 获取特定特征组的 schema
fastify.get(
'/feature-groups/:groupName/schema',
{ schema: { params: featureGroupParamsSchema } },
async (request: FastifyRequest<{ Params: { groupName: string } }>, reply) => {
const { groupName } = request.params;
const cacheKey = `feature-groups:schema:${groupName}`;
const cachedData = await getFromCache<any[]>(cacheKey);
if (cachedData) {
return reply.send(cachedData);
}
const data = await getFeatureGroupSchema(groupName);
// Schema 变更频率更低,可以缓存更久,例如 1 小时
await setInCache(cacheKey, data, 3600);
return reply.send(data);
}
);
// 获取特征组的样本数据
// 这个接口通常不建议缓存,因为它用于实时数据探索
fastify.get(
'/feature-groups/:groupName/sample',
{ schema: { params: featureGroupParamsSchema, querystring: sampleQuerySchema } },
async (request: FastifyRequest<{ Params: { groupName: string }, Querystring: { limit?: number } }>, reply) => {
const { groupName } = request.params;
const limit = request.query.limit || 50;
const data = await getFeatureGroupSample(groupName, limit);
return reply.send(data);
}
);
done();
}
注意,我们在获取列表和 Schema 的接口中加入了 Redis 缓存。这里的坑在于,数据湖查询本身是高延迟操作,即使 Trino 很快,也可能需要数秒。对于一个 Web UI,这种延迟是不可接受的。对于元数据这类不常变化的信息,缓存是必需的。
第二步:实现 Next.js 前端
前端使用 Next.js,利用其服务端渲染(SSR)或客户端渲染(CSR)能力来调用网关 API。对于特征组列表这种基本不变的页面,可以使用 getStaticProps
配合 ISR (Incremental Static Regeneration) 生成静态页面,性能体验最好。对于需要交互式查询样本数据的部分,则使用客户端请求。
一个简单的特征组浏览器页面可能长这样:
pages/features/index.tsx
import { useState, useEffect } from 'react';
import type { GetServerSideProps, NextPage } from 'next';
const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:3001';
interface FeatureGroupProps {
initialFeatureGroups: string[];
}
const FeatureExplorer: NextPage<FeatureGroupProps> = ({ initialFeatureGroups }) => {
const [featureGroups] = useState<string[]>(initialFeatureGroups);
const [selectedGroup, setSelectedGroup] = useState<string | null>(null);
const [schema, setSchema] = useState<any[]>([]);
const [sample, setSample] = useState<any[]>([]);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (!selectedGroup) return;
const fetchGroupDetails = async () => {
setIsLoading(true);
setError(null);
try {
const [schemaRes, sampleRes] = await Promise.all([
fetch(`${API_BASE_URL}/feature-groups/${selectedGroup}/schema`),
fetch(`${API_BASE_URL}/feature-groups/${selectedGroup}/sample?limit=20`),
]);
if (!schemaRes.ok || !sampleRes.ok) {
throw new Error('Failed to fetch feature group details');
}
setSchema(await schemaRes.json());
setSample(await sampleRes.json());
} catch (err: any) {
setError(err.message);
} finally {
setIsLoading(false);
}
};
fetchGroupDetails();
}, [selectedGroup]);
return (
<div style={{ display: 'flex', fontFamily: 'sans-serif' }}>
<aside style={{ width: '250px', borderRight: '1px solid #ccc', padding: '1rem' }}>
<h2>Feature Groups</h2>
<ul>
{featureGroups.map(group => (
<li key={group} onClick={() => setSelectedGroup(group)} style={{ cursor: 'pointer', fontWeight: selectedGroup === group ? 'bold' : 'normal' }}>
{group}
</li>
))}
</ul>
</aside>
<main style={{ flex: 1, padding: '1rem' }}>
{selectedGroup ? (
<>
<h3>{selectedGroup}</h3>
{isLoading && <p>Loading...</p>}
{error && <p style={{ color: 'red' }}>Error: {error}</p>}
{!isLoading && !error && (
<>
<h4>Schema</h4>
{/* A simple table rendering for schema */}
<table>
<thead>
<tr><th>Column</th><th>Type</th></tr>
</thead>
<tbody>
{schema.map(col => <tr key={col.Column}><td>{col.Column}</td><td>{col.Type}</td></tr>)}
</tbody>
</table>
<h4>Sample Data (First 20 rows)</h4>
{/* A simple table rendering for sample data */}
{sample.length > 0 && (
<table style={{width: '100%', borderCollapse: 'collapse'}}>
<thead>
<tr>{Object.keys(sample[0]).map(key => <th key={key} style={{border: '1px solid #ddd', padding: '8px'}}>{key}</th>)}</tr>
</thead>
<tbody>
{sample.map((row, i) => (
<tr key={i}>
{Object.values(row).map((val: any, j) => <td key={j} style={{border: '1px solid #ddd', padding: '8px'}}>{String(val)}</td>)}
</tr>
))}
</tbody>
</table>
)}
</>
)}
</>
) : (
<p>Select a feature group to explore.</p>
)}
</main>
</div>
);
};
// 使用 SSR 获取初始列表,确保页面加载时就有数据
export const getServerSideProps: GetServerSideProps = async () => {
try {
const res = await fetch(`${API_BASE_URL}/feature-groups`);
if (!res.ok) {
// 在生产环境中,这里应该记录错误
console.error('Failed to fetch feature groups list from gateway');
return { props: { initialFeatureGroups: [] } };
}
const initialFeatureGroups = await res.json();
return { props: { initialFeatureGroups } };
} catch (error) {
console.error('Error connecting to gateway:', error);
return { props: { initialFeatureGroups: [] } };
}
};
export default FeatureExplorer;
这段代码展示了一个基本的交互流程:页面加载时通过 SSR 获取所有特征组列表,用户点击某个特征组后,客户端发起两个并行的 API 请求获取其 Schema 和样本数据,并展示在界面上。
Hudi 与 Trino 的配置联调
要让整个流程跑通,Trino 必须能正确识别 Hudi 表。这需要在 Trino 的 etc/catalog/
目录下创建一个 Hudi catalog 配置文件,例如 hudi.properties
。
hudi.properties
connector.name=hudi
hive.metastore.uri=thrift://your-hive-metastore:9083
# 如果 Hudi 表未在 Metastore 中注册,可以配置 S3 路径
# hive.s3.aws-access-key=...
# hive.s3.aws-secret-key=...
一个常见的坑是 Hudi 表的同步。Hudi 表写入数据后,需要将元数据同步到 Hive Metastore,Trino 才能发现它。在使用 Spark 写入 Hudi 时,需要确保相关的配置项被设为 true
:
// Spark DataFrameWriter options
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
// ... other hudi options
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.hive_sync.table", "user_profile_features")
.option("hoodie.datasource.hive_sync.database", "feature_store")
.option("hoodie.datasource.hive_sync.metastore.uris", "thrift://your-hive-metastore:9083")
.mode(SaveMode.Append)
.save("/path/to/hudi/user_profile_features")
没有正确的 Hive Sync,Trino 将无法查询到最新的表或分区,导致网关服务返回空数据或陈旧数据。
遗留问题与未来优化路径
这个架构虽然解决了从无到有的问题,但在生产环境中还存在一些局限性。
首先是性能和成本。Trino 查询会扫描 S3 上的 Parquet 文件,即使有优化,对于大规模查询仍然会产生不小的 I/O 和计算开销。高频次的样本数据查询可能会对 Trino 集群造成压力,并增加云服务成本。一个优化方向是,对于频繁访问的特征组,可以设计一个预聚合或抽样的 ETL 任务,将样本数据推送到一个更快的存储(如 Elasticsearch 或另一个 Redis 实例)中,网关优先从高速缓存读取。
其次是安全性。目前的网关服务只是一个简单的代理,缺乏精细的访问控制。未来需要集成公司的统一认证授权体系(如 OAuth2/OIDC),实现基于用户角色的特征组访问权限控制(RBAC),例如,只有特定团队的成员才能查看敏感特征。
最后是功能的扩展。当前的 UI 只能浏览数据。一个完整的 Feature Store 管理平台还应包括特征血缘追踪、特征统计信息(如分布、缺失率)、特征版本管理(Hudi 的时间旅行能力可以支持这一点)以及在线/离线特征数据的一致性校验等功能。这些都需要在网关服务和前端上进行更深入的开发。