基于TDD构建由Haskell与Flask驱动的CockroachDB分布式事务内核


一个项目的核心账本模块,最初用Python实现,在并发量上升后开始频繁出现数据不一致的幽灵事件。尽管使用了数据库的SELECT ... FOR UPDATE行锁,但在复杂的业务逻辑分支和重试机制下,死锁和更新丢失问题依然层出不穷。问题的根源在于,动态语言的灵活性无法在编译期保证状态转换的严谨性,而传统的数据库锁机制在分布式环境中又引入了新的复杂性。我们需要一个方案,能为这颗系统的“心脏”提供数学级别的严谨性和金融级的事务一致性。

这个困境迫使我们重新思考技术选型。我们决定将核心的事务处理逻辑剥离出来,构建一个独立的、高可用的微服务。而对于这个服务的技术栈,我们做出了一个非主流但目标明确的选择:使用Haskell实现核心业务逻辑,通过gRPC暴露接口,外部的Web API层继续由Flask框架负责,底层数据库则替换为支持分布式事务的CockroachDB。整个开发过程,将由测试驱动开发(TDD)方法论全程引导。

第一阶段:契约先行,用Protobuf定义不可变的服务边界

在TDD中,第一个“测试”并非代码,而是对系统行为的清晰定义。在微服务架构中,这意味着定义服务间的通信契约。我们使用Protocol Buffers来定义账本服务(LedgerService)的接口,这是我们整个系统交互的基石。

这个契約必须精确地描述所有操作,包括输入、输出以及可能的错误。

// proto/ledger.proto

syntax = "proto3";

package ledger;

// LedgerService 定义了所有与账本交互的操作
service LedgerService {
  // 创建一个新账户
  rpc CreateAccount(CreateAccountRequest) returns (Account) {}

  // 提交一笔双向交易
  // 必须保证原子性:要么全部成功,要么全部失败
  rpc PostTransaction(PostTransactionRequest) returns (TransactionResult) {}

  // 获取账户余额
  rpc GetAccountBalance(GetAccountBalanceRequest) returns (AccountBalance) {}
}

// 金额使用string类型来表示高精度小数,避免浮点数精度问题
// 服务端将使用Decimal类型处理
message Money {
  string currency = 1; // e.g., "USD"
  string amount = 2;   // e.g., "100.00"
}

message Account {
  string account_id = 1;
  Money balance = 2;
  string owner_id = 3;
}

message CreateAccountRequest {
  string owner_id = 1;
  Money initial_balance = 2;
}

// 交易分录,遵循复式记账法
message TransactionLeg {
  string account_id = 1;
  Money amount = 2;      // 正数表示贷方(Credit), 负数表示借方(Debit)
  string description = 3;
}

message PostTransactionRequest {
  string transaction_id = 1; // 客户端提供的幂等性ID
  repeated TransactionLeg legs = 2;
}

message TransactionResult {
  string transaction_id = 1;
  bool success = 2;
  string message = 3; // 成功或失败的信息
}

message GetAccountBalanceRequest {
  string account_id = 1;
}

message AccountBalance {
  string account_id = 1;
  Money balance = 2;
}

这份.proto文件就是我们的第一个交付物。它明确了:

  1. 数据模型:使用string表示金额以规避浮点数陷阱。
  2. 核心操作PostTransaction接受一个TransactionLeg数组,强制要求调用方遵循复式记账法(所有分录金额之和必须为零),这是保证账本平衡的第一道防线。
  3. 幂等性PostTransactionRequest包含一个transaction_id,要求服务实现时必须考虑幂等性,防止网络重试导致重复记账。

第二阶段:TDD驱动Haskell核心逻辑的纯函数实现

在接触数据库和网络之前,TDD的精髓在于首先验证纯粹的业务逻辑。Haskell的强类型系统和纯函数特性在这里展现出巨大优势。我们可以将核心的交易验证逻辑建模为纯函数,其正确性仅依赖于输入,不产生任何副作用。

我们的测试框架选择hspec。第一个测试文件test/Ledger/ValidationSpec.hs将验证交易的内部一致性。

-- test/Ledger/ValidationSpec.hs
module Ledger.ValidationSpec (spec) where

import Test.Hspec
import Ledger.Types -- 我们将要创建的类型定义模块
import Ledger.Validation -- 我们将要创建的验证模块
import Data.Decimal

spec :: Spec
spec = describe "Ledger.Validation" $ do
  describe "validateTransactionLegs" $ do
    it "should succeed for a balanced two-leg transaction" $ do
      let legs = [ Leg "acc_1" (Decimal 2 (-1000)) "debit"
                 , Leg "acc_2" (Decimal 2 1000) "credit"
                 ]
      validateTransactionLegs legs `shouldBe` Right ()

    it "should fail for an unbalanced transaction" $ do
      let legs = [ Leg "acc_1" (Decimal 2 (-1000)) "debit"
                 , Leg "acc_2" (Decimal 2 999) "credit" -- 不平衡
                 ]
      validateTransactionLegs legs `shouldBe` Left "Transaction legs do not sum to zero."

    it "should fail if any leg has a zero amount" $ do
      let legs = [ Leg "acc_1" (Decimal 2 (-1000)) "debit"
                 , Leg "acc_2" (Decimal 2 0) "zero"
                 , Leg "acc_3" (Decimal 2 1000) "credit"
                 ]
      validateTransactionLegs legs `shouldBe` Left "Transaction leg cannot have zero amount."

    it "should fail with no legs" $ do
      validateTransactionLegs [] `shouldBe` Left "Transaction must have at least two legs."

这个测试驱动我们创建相应的数据类型和验证函数。

-- src/Ledger/Types.hs
module Ledger.Types where

import Data.Decimal (Decimal)
import Data.Text (Text)

type AccountID = Text
type OwnerID = Text

data Leg = Leg
  { legAccountId :: AccountID
  , legAmount    :: Decimal  -- 使用高精度Decimal
  , legDescription :: Text
  } deriving (Show, Eq)

-- src/Ledger/Validation.hs
module Ledger.Validation where

import Ledger.Types
import Data.Decimal (Decimal, decimalPlaces)
import qualified Data.Text as T

type ValidationError = T.Text

validateTransactionLegs :: [Leg] -> Either ValidationError ()
validateTransactionLegs [] = Left "Transaction must have at least two legs."
validateTransactionLegs [_] = Left "Transaction must have at least two legs."
validateTransactionLegs legs
  | any (\l -> legAmount l == 0) legs = Left "Transaction leg cannot have zero amount."
  | sum (map legAmount legs) /= 0 = Left "Transaction legs do not sum to zero."
  | otherwise = Right ()

通过先写测试,我们被迫思考所有边界情况:空交易、单边交易、金额为零、收支不平衡。Haskell的编译器会确保我们的函数签名正确,而hspec则验证了逻辑的正确性。这个阶段,我们完全没有考虑数据库或API,只关注业务规则本身。

第三阶段:集成CockroachDB,处理真实世界的副作用

纯函数逻辑验证通过后,我们引入数据库。CockroachDB因其兼容PostgreSQL协议和内置的SERIALIZABLE隔离级别成为理想选择。SERIALIZABLE提供了最强的隔离保证,它确保并发事务的执行结果等同于它们以某种顺序串行执行的结果,这对于金融账本至关重要。

首先,定义数据库表结构。

-- schema.sql
CREATE TABLE accounts (
    account_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    owner_id VARCHAR(255) NOT NULL,
    currency CHAR(3) NOT NULL,
    balance DECIMAL(19, 4) NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE transactions (
    transaction_id UUID PRIMARY KEY,
    idempotency_key VARCHAR(255) UNIQUE NOT NULL, -- 用于幂等性控制
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE transaction_legs (
    leg_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    transaction_id UUID NOT NULL REFERENCES transactions(transaction_id),
    account_id UUID NOT NULL REFERENCES accounts(account_id),
    amount DECIMAL(19, 4) NOT NULL,
    description VARCHAR(255),
    created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- CockroachDB推荐使用索引来优化查询性能
CREATE INDEX ON accounts (owner_id);

接下来,我们编写Haskell代码与数据库交互。这里使用postgresql-simple库。我们的核心函数runTransaction将在一个数据库事务中执行所有操作。

-- src/Ledger/DB.hs
module Ledger.DB where

import Database.PostgreSQL.Simple
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Exception (throwIO, Exception, bracket)
import Ledger.Types

-- ... (数据库连接池等配置) ...

data InsufficientFundsException = InsufficientFundsException AccountID deriving (Show, Eq)
instance Exception InsufficientFundsException

-- 核心事务处理逻辑
-- 这里的MonadIO m => ... m () 结构允许我们在不同monad中复用数据库逻辑
runTransaction :: MonadIO m => Connection -> Text -> [Leg] -> m ()
runTransaction conn idempotencyKey legs = liftIO $ withTransactionSerializable conn $ do
    -- 1. 幂等性检查
    [Only count] <- query conn "SELECT count(*) FROM transactions WHERE idempotency_key = ?" (Only idempotencyKey)
    if count > (0 :: Int)
    then pure () -- 已处理,直接返回成功
    else do
        -- 2. 为所有涉及的账户加上行锁
        -- CockroachDB的SERIALIZABLE会自动处理,但显式锁定有助于理解意图
        let accountIds = map legAccountId legs
        _ <- execute conn "SELECT balance FROM accounts WHERE account_id = ANY(?) FOR UPDATE" (Only (In accountIds))
        
        -- 3. 检查所有借方账户的余额是否充足
        mapM_ (checkBalance conn) (filter (\l -> legAmount l < 0) legs)

        -- 4. 插入交易主记录和分录
        [Only transId] <- query conn "INSERT INTO transactions (idempotency_key) VALUES (?) RETURNING transaction_id" (Only idempotencyKey)
        let legData = map (\l -> (transId, legAccountId l, legAmount l, legDescription l)) legs
        _ <- executeMany conn "INSERT INTO transaction_legs (transaction_id, account_id, amount, description) VALUES (?, ?, ?, ?)" legData

        -- 5. 更新所有账户余额
        mapM_ (updateBalance conn) legs
  where
    checkBalance c (Leg accId amount) = do
      [Only bal] <- query c "SELECT balance FROM accounts WHERE account_id = ?" (Only accId)
      if bal + amount < 0
      then throwIO $ InsufficientFundsException accId
      else pure ()

    updateBalance c (Leg accId amount) = do
      _ <- execute c "UPDATE accounts SET balance = balance + ? WHERE account_id = ?" (amount, accId)
      pure ()

这段代码是系统的核心,有几个关键点:

  1. withTransactionSerializable: 使用postgresql-simple提供的函数来确保整个代码块在CockroachDB的SERIALIZABLE事务中运行。如果发生写冲突,CockroachDB会自动重试事务,或在多次失败后返回序列化错误,这些都需要上层逻辑(gRPC服务)来处理。
  2. 幂等性: 首先检查idempotency_key是否存在,如果存在则直接返回成功,这是保证客户端重试安全的关键。
  3. 锁定与检查: 尽管SERIALIZABLE提供了保证,但在事务开始时就用FOR UPDATE锁定相关账户,可以更早地发现冲突,减少无效计算。
  4. 原子性: 所有数据库操作(检查、插入、更新)都在一个事务块中。任何一步失败,比如checkBalance抛出InsufficientFundsException异常,都会导致整个事务回滚,数据库状态保持不变。

第四阶段:实现gRPC服务,连接纯逻辑与副作用

现在我们可以将gRPC接口、纯粹的验证逻辑和数据库操作串联起来。这里使用grpc-haskell库。

-- app/Main.hs
import Network.GRPC.Server
import Proto.Ledger
import Proto.Ledger_Fields
import Ledger.Validation (validateTransactionLegs)
import Ledger.DB (runTransaction)
-- ... other imports ...

ledgerService :: Connection -> LedgerService ServerRequest ServerResponse
ledgerService conn = LedgerService
  { ledgerServiceCreateAccount = undefined -- 省略实现
  , ledgerServiceGetAccountBalance = undefined -- 省略实现
  , ledgerServicePostTransaction = postTransactionHandler conn
  }

postTransactionHandler :: Connection -> ServerRequest 'Normal PostTransactionRequest TransactionResult -> IO (ServerResponse 'Normal TransactionResult)
postTransactionHandler conn (ServerNormalRequest _meta req) = do
    let protoLegs = req ^. legs
    let idempotencyKey = req ^. transactionId

    -- 1. Protobuf数据到Haskell类型的转换
    let eitherDomainLegs = mapM fromProtoLeg protoLegs

    case eitherDomainLegs of
        Left err -> pure $ ServerNormalResponse (mkErrorResult idempotencyKey err) [] StatusInvalidArgument (Details err)
        Right domainLegs -> do
            -- 2. 调用纯函数验证逻辑
            case validateTransactionLegs domainLegs of
                Left validationErr -> pure $ ServerNormalResponse (mkErrorResult idempotencyKey validationErr) [] StatusInvalidArgument (Details validationErr)
                Right () -> do
                    -- 3. 执行数据库事务
                    result <- try (runTransaction conn idempotencyKey domainLegs) :: IO (Either SomeException ())
                    case result of
                        Right () -> pure $ ServerNormalResponse (mkSuccessResult idempotencyKey) [] StatusOk (Details "Transaction successful")
                        Left e
                            | Just (err :: InsufficientFundsException) <- fromException e ->
                                pure $ ServerNormalResponse (mkErrorResult idempotencyKey "Insufficient funds") [] StatusFailedPrecondition (Details $ T.pack $ show err)
                            | Just (err :: P.SqlError) <- fromException e ->
                                -- 处理数据库序列化冲突等错误
                                pure $ ServerNormalResponse (mkErrorResult idempotencyKey "Database conflict, please retry.") [] StatusAborted (Details $ T.pack $ show err)
                            | otherwise ->
                                -- 其他未知错误
                                pure $ ServerNormalResponse (mkErrorResult idempotencyKey "Internal server error") [] StatusUnknown (Details $ T.pack $ show e)

-- ... (辅助函数 mkSuccessResult, mkErrorResult, fromProtoLeg 等) ...

main :: IO ()
main = do
  -- 初始化数据库连接池等
  let dbConn = ...
  runServer (serverOptions somePort) [serviceHandler (ledgerService dbConn)]

这个Handler的逻辑清晰地分成了三步:

  1. 解码: 将gRPC请求中的数据转换为我们内部的Haskell领域模型。如果转换失败(例如,金额格式错误),立即返回InvalidArgument错误。
  2. 验证: 调用已经过TDD测试的validateTransactionLegs纯函数。如果验证失败,返回InvalidArgument
  3. 执行: 只有在前两步都通过后,才调用runTransaction与数据库交互。这里通过try捕获所有可能的异常,并将它们映射到合适的gRPC状态码。例如,InsufficientFundsException映射到FailedPrecondition,数据库冲突映射到Aborted,并建议客户端重试。

第五阶段:为Flask网关编写集成测试

现在,Haskell服务已经就绪,我们需要在Python(Flask)端消费它。同样,我们采用TDD方法。

首先,在Python项目中,使用grpcio-tools.proto文件生成客户端代码。

python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated ./proto/ledger.proto

然后,我们为Flask API端点编写一个集成测试。这个测试将启动一个真实的Flask应用实例,并向其发送HTTP请求。在测试环境中,Flask应用会连接到正在运行的Haskell gRPC服务。

# tests/test_api.py
import pytest
from flask import Flask
import grpc
import json

from generated import ledger_pb2, ledger_pb2_grpc
from app import create_app # 我们的Flask app工厂

@pytest.fixture(scope="module")
def app():
    app = create_app(testing=True)
    # 可以在这里配置gRPC channel指向测试环境的Haskell服务
    return app

@pytest.fixture
def client(app):
    return app.test_client()

def test_post_valid_transaction(client):
    """
    一个端到端的集成测试,覆盖从HTTP API到Haskell服务再到CockroachDB的完整流程
    """
    # 假设我们已经通过某种方式创建了 acc_sender 和 acc_receiver
    # 并且它们有足够的余额
    tx_id = f"test-tx-{uuid.uuid4()}"
    payload = {
        "transaction_id": tx_id,
        "legs": [
            {"account_id": "acc_sender", "amount": "-10.00", "currency": "USD", "description": "Payment"},
            {"account_id": "acc_receiver", "amount": "10.00", "currency": "USD", "description": "Receipt"}
        ]
    }

    response = client.post('/transactions', data=json.dumps(payload), content_type='application/json')

    assert response.status_code == 200
    data = response.get_json()
    assert data['success'] is True
    assert data['transaction_id'] == tx_id

    # 这里可以添加一步,直接查询数据库或调用GetBalance接口,验证账户余额确实发生了变化
    # ...

def test_post_unbalanced_transaction(client):
    tx_id = f"test-tx-{uuid.uuid4()}"
    payload = {
        "transaction_id": tx_id,
        "legs": [
            {"account_id": "acc_sender", "amount": "-10.00", "currency": "USD"},
            {"account_id": "acc_receiver", "amount": "9.99", "currency": "USD"} # 不平衡
        ]
    }
    response = client.post('/transactions', data=json.dumps(payload), content_type='application/json')
    
    # 我们期望API网关将gRPC的InvalidArgument错误转换为HTTP 400 Bad Request
    assert response.status_code == 400
    data = response.get_json()
    assert "Transaction legs do not sum to zero" in data['message']

这个测试驱动我们实现Flask的API端点。

# app/routes.py
from flask import Blueprint, request, jsonify
import grpc

from generated import ledger_pb2, ledger_pb2_grpc

bp = Blueprint('api', __name__)

# 在应用启动时创建gRPC channel
# 实际项目中应使用更健壮的连接管理
channel = grpc.insecure_channel('localhost:50051') 
stub = ledger_pb2_grpc.LedgerServiceStub(channel)

@bp.route('/transactions', methods=['POST'])
def post_transaction():
    data = request.get_json()
    if not data:
        return jsonify({"message": "Invalid JSON"}), 400

    legs = [
        ledger_pb2.TransactionLeg(
            account_id=leg['account_id'],
            amount=ledger_pb2.Money(currency=leg['currency'], amount=leg['amount']),
            description=leg.get('description', '')
        ) for leg in data.get('legs', [])
    ]

    req = ledger_pb2.PostTransactionRequest(
        transaction_id=data.get('transaction_id'),
        legs=legs
    )

    try:
        result = stub.PostTransaction(req, timeout=5.0) # 设置超时
        if result.success:
            return jsonify({"transaction_id": result.transaction_id, "success": True}), 200
        else:
            # 这种情况比较少见,通常错误会通过gRPC异常抛出
            return jsonify({"message": result.message}), 500
            
    except grpc.RpcError as e:
        # 将gRPC的错误码映射到HTTP状态码
        if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
            return jsonify({"message": e.details()}), 400
        elif e.code() == grpc.StatusCode.FAILED_PRECONDITION:
            return jsonify({"message": e.details()}), 409 # Conflict
        elif e.code() == grpc.StatusCode.ABORTED:
            # 建议客户端重试
            return jsonify({"message": "Concurrent modification, please retry", "details": e.details()}), 429
        else:
            # 记录未知gRPC错误
            # logger.error(...)
            return jsonify({"message": "Internal service error"}), 503

这个Flask端点现在是一个轻量级的网关。它只负责协议转换(JSON/HTTP -> gRPC)和错误码映射,所有复杂的、关键的业务逻辑都被安全地封装在Haskell服务中。

方案局限与迭代方向

这套架构并非银弹。首先,引入Haskell和gRPC显著增加了系统的运维复杂度和团队的技术栈要求。调试跨语言、跨进程的调用链也比单体应用更具挑战,需要完善的分布式追踪系统支持。

其次,虽然gRPC性能优越,但网络调用始终存在开销。对于需要极低延迟的场景,服务间的通信成本可能成为瓶颈。此外,CockroachDB的SERIALIZABLE事务在冲突率高的负载下可能会出现性能下降和重试增多的情况,需要对业务场景和数据访问模式进行仔细评估。

未来的迭代可以探索几个方向。一方面是优化Haskell服务的性能,例如使用更高效的数据库连接池,或者对热点数据进行应用层缓存。另一方面,可以考虑引入服务网格(Service Mesh)来标准化服务间的重试、超时和熔断策略,将这些基础设施层面的关注点从业务代码中剥离出去,让Flask和Haskell服务更专注于自身的核心职责。


  目录