构建连接 Next.js 与 Apache Hudi 的 Feature Store 网关服务


团队的机器学习平台最近遇到了一个典型的“数据孤岛”问题。数据科学家们使用 Spark 在数据湖上构建了一套基于 Apache Hudi 的 Feature Store,工作流顺畅,模型迭代效率很高。Hudi 的 Copy-on-Write 和 Merge-on-Read 表类型为他们提供了近乎实时的数据更新和快照查询能力,非常适合特征工程。但问题在于,这些宝贵的特征数据被封闭在了大数据生态里。业务分析师、应用开发者甚至一些产品经理,都希望能有一个直观、快速的方式来探索、验证这些特征,而不是每次都需要数据科学家写一段 Spark SQL 导出 CSV。

最初的构想很简单:做一个 Web UI。但技术选型很快就暴露了挑战。Web 应用(我们偏好 Next.js/Node.js 技术栈)如何与一个基于 JVM、存储在 S3 上的 Hudi 数据湖进行高效、安全地交互?直接从 Node.js 服务连接 Spark Thrift Server 或者 Hive Metastore 过于笨重,且延迟对于一个交互式 Web 应用是致命的。

经过几轮讨论和原型验证,我们确定了最终架构:在 Web 前端和数据湖之间引入一个轻量级的“Feature Store 网关服务”。这个服务将作为翻译官和安全屏障,负责将前端的 HTTP 请求转换为对数据湖的 SQL 查询,并对结果进行缓存和格式化。

graph TD
    A[用户/浏览器] --> B(Next.js 前端);
    B --> C{Node.js Feature Store 网关};
    C --> D[Trino 查询引擎];
    D --> E[Hive Metastore];
    D --> F[S3 上的 Hudi 数据];
    subgraph "Web 应用层"
        B
        C
    end
    subgraph "数据湖查询层"
        D
        E
    end
    subgraph "存储层"
        F
    end
    C -- "查询缓存" --> G(Redis);

这个架构的核心是 Trino (前身为 PrestoSQL)。选择 Trino 的理由很明确:它被设计用于对异构数据源进行快速的交互式分析,其 Hudi Connector 性能优异,并且提供了标准的 JDBC/ODBC 接口,更重要的是,有成熟的 Node.js 客户端库。Node.js 服务通过 Trino 查询 Hudi 数据,Next.js 则与 Node.js 服务通过 REST API 通信。这样,复杂的数据湖交互被完全封装在了网关服务内部。

第一步:搭建 Node.js 网关服务

我们选择 Fastify 框架,因为它以高性能和低开销著称,非常适合构建 API 网关。核心依赖是 trino-client 用于和 Trino 集群通信。

项目结构初步设定如下:

feature-store-gateway/
├── src/
│   ├── app.ts          # Fastify 应用入口
│   ├── routes/
│   │   └── features.ts # 特征相关的 API 路由
│   ├── services/
│   │   ├── trino.ts    # Trino 客户端封装
│   │   └── cache.ts    # Redis 缓存服务
│   └── utils/
│       └── logger.ts   # 日志配置
├── package.json
└── tsconfig.json

首先是 Trino 服务的封装。在真实项目中,配置必须外部化,不能硬编码。这里使用环境变量进行管理,并包含必要的错误处理和连接池思想。

src/services/trino.ts

import { Trino, BasicAuth } from 'trino-client';
import pino from 'pino';

const logger = pino({ name: 'TrinoService' });

// 从环境变量中读取配置,这是生产实践的基础
const TRINO_HOST = process.env.TRINO_HOST || 'http://localhost:8080';
const TRINO_USER = process.env.TRINO_USER || 'admin';
const TRINO_CATALOG = process.env.TRINO_CATALOG || 'hudi';
const TRINO_SCHEMA = process.env.TRINO_SCHEMA || 'feature_store';

// 初始化 Trino 客户端实例
// 这里的配置应该包含超时、重试等生产级参数
const trinoClient: Trino = Trino.create({
  server: TRINO_HOST,
  auth: new BasicAuth(TRINO_USER),
  catalog: TRINO_CATALOG,
  schema: TRINO_SCHEMA,
  // 关键:为查询设置一个合理的超时,防止慢查询拖垮整个网关
  requestTimeout: '30s', 
});

/**
 * 执行一个只读的 Trino 查询。
 * 这是一个通用的查询执行器,包含了基本的日志和错误处理。
 * @param sql - 要执行的 SQL 语句
 * @returns 查询结果数组
 */
export async function executeQuery<T>(sql: string): Promise<T[]> {
  logger.info({ sql }, 'Executing Trino query');
  const start = Date.now();

  try {
    const iterator = await trinoClient.query(sql);
    const rows: T[] = [];

    for await (const row of iterator) {
      // trino-client 返回的是数组,我们需要将其转换为对象
      // @ts-ignore - Assuming iterator yields results with `columns` and `data`
      const columns = row.columns?.map(col => col.name) || [];
      const data = row.data || [];
      const rowObject = columns.reduce((obj, key, index) => {
        obj[key] = data[index];
        return obj;
      }, {} as Record<string, any>);
      rows.push(rowObject as T);
    }

    const duration = Date.now() - start;
    logger.info({ sql, duration, rowCount: rows.length }, 'Trino query executed successfully');

    return rows;
  } catch (error: any) {
    const duration = Date.now() - start;
    logger.error(
      { sql, duration, error: error.message, stack: error.stack },
      'Trino query failed'
    );
    // 向上抛出经过封装的错误,而不是原始的客户端错误
    throw new Error(`Failed to execute query on Trino: ${error.message}`);
  }
}

/**
 * 获取所有特征组(在 Hudi 中表现为表)的列表。
 * @returns 特征组名称数组
 */
export async function getFeatureGroups(): Promise<string[]> {
  const sql = `SHOW TABLES FROM ${TRINO_SCHEMA}`;
  const results = await executeQuery<{ Table: string }>(sql);
  return results.map(row => row.Table);
}

/**
 * 获取特定特征组的元数据(列名和类型)。
 * @param groupName - 特征组名称
 * @returns 字段元数据数组
 */
export async function getFeatureGroupSchema(groupName: string): Promise<any[]> {
  // 注意:这里的 groupName 必须经过校验,防止 SQL 注入。
  // 尽管 Trino 客户端可能处理了参数化,但在应用层进行校验是最佳实践。
  if (!/^[a-zA-Z0-9_]+$/.test(groupName)) {
    throw new Error('Invalid feature group name.');
  }
  const sql = `DESCRIBE ${TRINO_SCHEMA}.${groupName}`;
  return await executeQuery(sql);
}

/**
 * 获取特征组的样本数据。
 * @param groupName - 特征组名称
 * @param limit - 返回的行数
 * @returns 样本数据
 */
export async function getFeatureGroupSample(groupName: string, limit: number = 50): Promise<any[]> {
  if (!/^[a-zA-Z0-9_]+$/.test(groupName)) {
    throw new Error('Invalid feature group name.');
  }
  const sql = `SELECT * FROM ${TRINO_SCHEMA}.${groupName} LIMIT ${limit}`;
  return await executeQuery(sql);
}

接下来是 API 路由的定义。我们使用 Fastify 的 schema validation 功能,这不仅能保证入参的正确性,还能自动生成 Swagger/OpenAPI 文档,非常实用。

src/routes/features.ts

import { FastifyInstance, FastifyPluginOptions, FastifyRequest } from 'fastify';
import {
  getFeatureGroups,
  getFeatureGroupSchema,
  getFeatureGroupSample,
} from '../services/trino';
import { getFromCache, setInCache } from '../services/cache';

// 定义 API 的 schema 以便进行验证和序列化
const featureGroupParamsSchema = {
  type: 'object',
  properties: {
    groupName: { type: 'string', pattern: '^[a-zA-Z0-9_]+$' },
  },
  required: ['groupName'],
};

const sampleQuerySchema = {
  type: 'object',
  properties: {
    limit: { type: 'integer', minimum: 1, maximum: 200, default: 50 },
  },
};

export function featureRoutes(
  fastify: FastifyInstance,
  options: FastifyPluginOptions,
  done: () => void
) {
  // 获取所有特征组列表
  fastify.get('/feature-groups', async (request, reply) => {
    const cacheKey = 'feature-groups:list';
    const cachedData = await getFromCache<string[]>(cacheKey);
    if (cachedData) {
      return reply.send(cachedData);
    }

    const data = await getFeatureGroups();
    // 缓存特征组列表 5 分钟,因为这通常不频繁变动
    await setInCache(cacheKey, data, 300); 
    return reply.send(data);
  });

  // 获取特定特征组的 schema
  fastify.get(
    '/feature-groups/:groupName/schema',
    { schema: { params: featureGroupParamsSchema } },
    async (request: FastifyRequest<{ Params: { groupName: string } }>, reply) => {
      const { groupName } = request.params;
      const cacheKey = `feature-groups:schema:${groupName}`;
      
      const cachedData = await getFromCache<any[]>(cacheKey);
      if (cachedData) {
        return reply.send(cachedData);
      }

      const data = await getFeatureGroupSchema(groupName);
      // Schema 变更频率更低,可以缓存更久,例如 1 小时
      await setInCache(cacheKey, data, 3600);
      return reply.send(data);
    }
  );

  // 获取特征组的样本数据
  // 这个接口通常不建议缓存,因为它用于实时数据探索
  fastify.get(
    '/feature-groups/:groupName/sample',
    { schema: { params: featureGroupParamsSchema, querystring: sampleQuerySchema } },
    async (request: FastifyRequest<{ Params: { groupName: string }, Querystring: { limit?: number } }>, reply) => {
      const { groupName } = request.params;
      const limit = request.query.limit || 50;
      const data = await getFeatureGroupSample(groupName, limit);
      return reply.send(data);
    }
  );

  done();
}

注意,我们在获取列表和 Schema 的接口中加入了 Redis 缓存。这里的坑在于,数据湖查询本身是高延迟操作,即使 Trino 很快,也可能需要数秒。对于一个 Web UI,这种延迟是不可接受的。对于元数据这类不常变化的信息,缓存是必需的。

第二步:实现 Next.js 前端

前端使用 Next.js,利用其服务端渲染(SSR)或客户端渲染(CSR)能力来调用网关 API。对于特征组列表这种基本不变的页面,可以使用 getStaticProps 配合 ISR (Incremental Static Regeneration) 生成静态页面,性能体验最好。对于需要交互式查询样本数据的部分,则使用客户端请求。

一个简单的特征组浏览器页面可能长这样:

pages/features/index.tsx

import { useState, useEffect } from 'react';
import type { GetServerSideProps, NextPage } from 'next';

const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:3001';

interface FeatureGroupProps {
  initialFeatureGroups: string[];
}

const FeatureExplorer: NextPage<FeatureGroupProps> = ({ initialFeatureGroups }) => {
  const [featureGroups] = useState<string[]>(initialFeatureGroups);
  const [selectedGroup, setSelectedGroup] = useState<string | null>(null);
  const [schema, setSchema] = useState<any[]>([]);
  const [sample, setSample] = useState<any[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    if (!selectedGroup) return;

    const fetchGroupDetails = async () => {
      setIsLoading(true);
      setError(null);
      try {
        const [schemaRes, sampleRes] = await Promise.all([
          fetch(`${API_BASE_URL}/feature-groups/${selectedGroup}/schema`),
          fetch(`${API_BASE_URL}/feature-groups/${selectedGroup}/sample?limit=20`),
        ]);

        if (!schemaRes.ok || !sampleRes.ok) {
          throw new Error('Failed to fetch feature group details');
        }

        setSchema(await schemaRes.json());
        setSample(await sampleRes.json());
      } catch (err: any) {
        setError(err.message);
      } finally {
        setIsLoading(false);
      }
    };

    fetchGroupDetails();
  }, [selectedGroup]);

  return (
    <div style={{ display: 'flex', fontFamily: 'sans-serif' }}>
      <aside style={{ width: '250px', borderRight: '1px solid #ccc', padding: '1rem' }}>
        <h2>Feature Groups</h2>
        <ul>
          {featureGroups.map(group => (
            <li key={group} onClick={() => setSelectedGroup(group)} style={{ cursor: 'pointer', fontWeight: selectedGroup === group ? 'bold' : 'normal' }}>
              {group}
            </li>
          ))}
        </ul>
      </aside>
      <main style={{ flex: 1, padding: '1rem' }}>
        {selectedGroup ? (
          <>
            <h3>{selectedGroup}</h3>
            {isLoading && <p>Loading...</p>}
            {error && <p style={{ color: 'red' }}>Error: {error}</p>}
            {!isLoading && !error && (
              <>
                <h4>Schema</h4>
                {/* A simple table rendering for schema */}
                <table>
                  <thead>
                    <tr><th>Column</th><th>Type</th></tr>
                  </thead>
                  <tbody>
                    {schema.map(col => <tr key={col.Column}><td>{col.Column}</td><td>{col.Type}</td></tr>)}
                  </tbody>
                </table>
                <h4>Sample Data (First 20 rows)</h4>
                {/* A simple table rendering for sample data */}
                {sample.length > 0 && (
                  <table style={{width: '100%', borderCollapse: 'collapse'}}>
                    <thead>
                      <tr>{Object.keys(sample[0]).map(key => <th key={key} style={{border: '1px solid #ddd', padding: '8px'}}>{key}</th>)}</tr>
                    </thead>
                    <tbody>
                      {sample.map((row, i) => (
                        <tr key={i}>
                          {Object.values(row).map((val: any, j) => <td key={j} style={{border: '1px solid #ddd', padding: '8px'}}>{String(val)}</td>)}
                        </tr>
                      ))}
                    </tbody>
                  </table>
                )}
              </>
            )}
          </>
        ) : (
          <p>Select a feature group to explore.</p>
        )}
      </main>
    </div>
  );
};

// 使用 SSR 获取初始列表,确保页面加载时就有数据
export const getServerSideProps: GetServerSideProps = async () => {
  try {
    const res = await fetch(`${API_BASE_URL}/feature-groups`);
    if (!res.ok) {
      // 在生产环境中,这里应该记录错误
      console.error('Failed to fetch feature groups list from gateway');
      return { props: { initialFeatureGroups: [] } };
    }
    const initialFeatureGroups = await res.json();
    return { props: { initialFeatureGroups } };
  } catch (error) {
    console.error('Error connecting to gateway:', error);
    return { props: { initialFeatureGroups: [] } };
  }
};

export default FeatureExplorer;

这段代码展示了一个基本的交互流程:页面加载时通过 SSR 获取所有特征组列表,用户点击某个特征组后,客户端发起两个并行的 API 请求获取其 Schema 和样本数据,并展示在界面上。

Hudi 与 Trino 的配置联调

要让整个流程跑通,Trino 必须能正确识别 Hudi 表。这需要在 Trino 的 etc/catalog/ 目录下创建一个 Hudi catalog 配置文件,例如 hudi.properties

hudi.properties

connector.name=hudi
hive.metastore.uri=thrift://your-hive-metastore:9083
# 如果 Hudi 表未在 Metastore 中注册,可以配置 S3 路径
# hive.s3.aws-access-key=...
# hive.s3.aws-secret-key=...

一个常见的坑是 Hudi 表的同步。Hudi 表写入数据后,需要将元数据同步到 Hive Metastore,Trino 才能发现它。在使用 Spark 写入 Hudi 时,需要确保相关的配置项被设为 true

// Spark DataFrameWriter options
df.write.format("hudi")
  .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
  // ... other hudi options
  .option("hoodie.datasource.hive_sync.enable", "true")
  .option("hoodie.datasource.hive_sync.table", "user_profile_features")
  .option("hoodie.datasource.hive_sync.database", "feature_store")
  .option("hoodie.datasource.hive_sync.metastore.uris", "thrift://your-hive-metastore:9083")
  .mode(SaveMode.Append)
  .save("/path/to/hudi/user_profile_features")

没有正确的 Hive Sync,Trino 将无法查询到最新的表或分区,导致网关服务返回空数据或陈旧数据。

遗留问题与未来优化路径

这个架构虽然解决了从无到有的问题,但在生产环境中还存在一些局限性。

首先是性能和成本。Trino 查询会扫描 S3 上的 Parquet 文件,即使有优化,对于大规模查询仍然会产生不小的 I/O 和计算开销。高频次的样本数据查询可能会对 Trino 集群造成压力,并增加云服务成本。一个优化方向是,对于频繁访问的特征组,可以设计一个预聚合或抽样的 ETL 任务,将样本数据推送到一个更快的存储(如 Elasticsearch 或另一个 Redis 实例)中,网关优先从高速缓存读取。

其次是安全性。目前的网关服务只是一个简单的代理,缺乏精细的访问控制。未来需要集成公司的统一认证授权体系(如 OAuth2/OIDC),实现基于用户角色的特征组访问权限控制(RBAC),例如,只有特定团队的成员才能查看敏感特征。

最后是功能的扩展。当前的 UI 只能浏览数据。一个完整的 Feature Store 管理平台还应包括特征血缘追踪、特征统计信息(如分布、缺失率)、特征版本管理(Hudi 的时间旅行能力可以支持这一点)以及在线/离线特征数据的一致性校验等功能。这些都需要在网关服务和前端上进行更深入的开发。


  目录