一个复杂业务流程的提交,比如包含主订单、库存扣减、积分增减的多服务协同操作,其后台处理绝不可能是瞬时完成的原子操作。在微服务架构下,这通常是一个由多个独立事务组成的分布式事务。如果采用传统的HTTP轮询方式来同步前端状态,不仅会造成服务端和网络的巨大浪费,更无法提供流畅、实时的用户体验。前端界面会长时间停留在“处理中”,用户无法得知事务执行到了哪一步,也无法在失败时获得清晰的上下文。
我们的技术挑战明确:为后端一个异步、长时运行的Saga分布式事务,在前端构建一个能够实时、精确、可靠地反映其完整生命周期的状态机,并且这个状态机本身必须是健壮的、可测试的。
定义问题:前端状态与后端事件的鸿沟
在后端,Saga模式通过一系列的本地事务和补偿操作来保证最终一致性。每个步骤的成功或失败都会以事件的形式发布到消息队列中。例如,一个订单流程可能产生如下事件流:ORDER_CREATED
-> PAYMENT_PROCESSED
-> INVENTORY_DEDUCTED
-> POINTS_CREDITED
-> ORDER_COMPLETED
。如果任何一步失败,则会触发补偿事件流:POINTS_DEDUCTION_FAILED
-> START_POINTS_COMPENSATION
-> INVENTORY_RESTOCK_STARTED
-> ORDER_FAILED
。
前端的挑战在于,如何跨越HTTP的瞬时性,订阅这个事件流,并将其转化为一个可预测的UI状态模型。用户不应该看到一连串杂乱的通知,而应该看到一个清晰的状态演进:正在处理 -> 正在支付 -> 正在锁定库存 -> 订单完成。
架构决策:消息系统选型 NATS vs. Pulsar
要实现实时推送,后端事件必须通过一个中间件推送到前端。最常见的模式是后端服务 -> 消息队列 -> 网关服务 (WebSocket) -> 前端
。这里的核心是消息队列的选择。
方案A: NATS JetStream
NATS以其极简和高性能著称。其核心是一个轻量级的发布/订阅系统。引入JetStream后,它获得了持久化、至少一次/精确一次送达保证等关键能力,使其能够胜任可靠消息传递的场景。
优势:
- 性能: 极高吞吐量,极低延迟。对于需要快速响应的事件通知非常理想。
- 简单: 部署和运维相对简单,客户端API直观易用。
- Stream/Consumer模型: JetStream的Stream可以持久化存储消息,Consumer可以按需消费,支持ACK机制,满足Saga事件的可靠传递。
劣势:
- 功能相对基础: 对于复杂的Saga场景,可能会缺少一些高级特性。例如,Saga模式中一个常见的需求是“超时触发补偿”,这需要消息队列具备延迟消息的能力。虽然可以通过在应用层实现定时器来模拟,但这增加了业务代码的复杂性。
方案B: Apache Pulsar
Pulsar在设计上是一个云原生的分布式消息平台,其分层架构(Broker + BookKeeper)带来了许多高级特性。
优势:
- 统一消息模型: 同时支持队列(Queue)和流(Stream)两种模式,能灵活适应不同消费场景。
- 内置延迟消息: 这是Pulsar的一个杀手级特性。可以直接向Pulsar发送一个消息,并指定它在一段时间后才对消费者可见。这对于实现Saga的超时机制是天然的、可靠的解决方案,极大地简化了编排器的设计。
- 多租户与分区: 内置的逻辑隔离(租户、命名空间)和物理扩展能力(分区),对于大型企业级系统非常友好。
劣劣势:
- 运维复杂性: 相较于NATS,Pulsar的组件(ZooKeeper/etcd, BookKeeper, Broker, Proxy)更多,部署和维护的成本更高。
最终选择:Pulsar
在真实项目中,分布式事务的可靠性压倒一切。一个Saga流程可能因为下游服务暂时不可用而需要等待和重试,也可能因为业务超时而必须触发补偿。Pulsar内置的延迟消息功能,让我们能够以一种非常优雅和健壮的方式来处理这类超时场景,而无需在业务代码中维护复杂的状态和定时器。因此,尽管运维成本更高,但为了架构的长期健壮性和简洁性,我们选择Pulsar作为Saga事件的载体。
核心实现:从后端事件到前端状态
我们的系统由四部分组成:Saga编排器(Go)、Pulsar集群、WebSocket网关(Node.js)和前端(React + Zustand)。
1. 后端Saga编排器与Pulsar生产者 (Go)
Saga的每一步都会向Pulsar发送一个结构化的事件。这个事件不仅包含状态,还包含事务ID、用户ID以及其他上下文信息,以便网关服务进行路由。
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
// SagaEvent 定义了我们系统中事件的统一结构
type SagaEvent struct {
TransactionID string `json:"transactionId"`
UserID string `json:"userId"`
Step string `json:"step"` // e.g., "PAYMENT_PROCESSING", "INVENTORY_RESERVED"
Status string `json:"status"` // "SUCCESS", "FAILURE", "COMPENSATING"
Message string `json:"message"` // User-friendly message
Timestamp int64 `json:"timestamp"`
}
func main() {
// 初始化 Pulsar Client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
// 为订单状态更新创建一个生产者
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "saga-order-status",
})
if err != nil {
log.Fatalf("Could not create producer: %v", err)
}
defer producer.Close()
// --- 模拟Saga流程 ---
transactionID := "txn-12345"
userID := "user-abcde"
// 步骤1: 支付处理中
publishSagaEvent(producer, transactionID, userID, "PAYMENT_PROCESSING", "SUCCESS", "支付渠道已受理,处理中...")
// 模拟耗时操作
time.Sleep(2 * time.Second)
// 步骤2: 库存锁定成功
publishSagaEvent(producer, transactionID, userID, "INVENTORY_RESERVED", "SUCCESS", "商品库存已锁定")
// 模拟耗时操作
time.Sleep(1 * time.Second)
// 步骤3: 订单完成
publishSagaEvent(producer, transactionID, userID, "ORDER_COMPLETED", "SUCCESS", "订单已成功处理!")
// --- 模拟一个失败并补偿的Saga ---
failedTxnID := "txn-67890"
publishSagaEvent(producer, transactionID, userID, "PAYMENT_PROCESSING", "SUCCESS", "支付渠道已受理,处理中...")
time.Sleep(2 * time.Second)
publishSagaEvent(producer, failedTxnID, userID, "INVENTORY_DEDUCTION", "FAILURE", "库存不足,操作失败")
time.Sleep(1 * time.Second)
publishSagaEvent(producer, failedTxnID, userID, "PAYMENT_COMPENSATING", "SUCCESS", "正在为您退款...")
}
func publishSagaEvent(producer pulsar.Producer, txnID, userID, step, status, message string) {
event := SagaEvent{
TransactionID: txnID,
UserID: userID,
Step: step,
Status: status,
Message: message,
Timestamp: time.Now().UnixMilli(),
}
payload, err := json.Marshal(event)
if err != nil {
log.Printf("Failed to marshal event: %v", err)
return
}
// 发送消息,使用 UserID 作为 key,可以保证同一用户的所有事件都进入同一个Pulsar分区
// 这对于需要严格顺序消费的场景非常重要
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: payload,
Key: userID,
})
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
log.Printf("Published event: %s - %s", step, status)
}
2. WebSocket网关:Pulsar消费者与消息推送 (Node.js)
网关服务是连接后端和前端的桥梁。它消费Pulsar中的事件,并通过WebSocket将消息实时推送给对应的客户端。
// server.js
import { WebSocketServer } from 'ws';
import Pulsar from 'pulsar-client';
const wss = new WebSocketServer({ port: 8080 });
// 简单的连接管理,生产环境需要更健壮的方案,如Redis
const clients = new Map();
wss.on('connection', (ws, req) => {
// 在真实场景中,这里会通过JWT等方式进行用户认证
// 并从认证信息中获取 userID
const urlParams = new URLSearchParams(req.url.slice(1));
const userId = urlParams.get('userId');
if (!userId) {
ws.close(1008, "User ID is required");
return;
}
console.log(`Client connected: ${userId}`);
// 存储客户端连接,以便后续推送消息
if (!clients.has(userId)) {
clients.set(userId, []);
}
clients.get(userId).push(ws);
ws.on('close', () => {
console.log(`Client disconnected: ${userId}`);
const userConnections = clients.get(userId);
if (userConnections) {
const index = userConnections.indexOf(ws);
if (index > -1) {
userConnections.splice(index, 1);
}
if (userConnections.length === 0) {
clients.delete(userId);
}
}
});
ws.on('error', console.error);
});
async function runPulsarConsumer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const consumer = await client.subscribe({
topic: 'persistent://public/default/saga-order-status',
subscription: 'websocket-gateway-subscription',
subscriptionType: 'Shared', // 使用共享订阅,允许多个网关实例并行消费
ackTimeoutMs: 10000,
});
console.log('Pulsar consumer started...');
while (true) {
try {
const msg = await consumer.receive();
const payload = msg.getData().toString();
console.log(`Received message: ${payload}`);
const event = JSON.parse(payload);
const { userId } = event;
if (clients.has(userId)) {
// 将消息推送给该用户的所有活跃连接
const userConnections = clients.get(userId);
userConnections.forEach(ws => {
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify(event));
}
});
}
await consumer.acknowledge(msg);
} catch (error) {
console.error('Failed to receive message:', error);
// 在生产环境中,这里需要更完善的错误处理和重连逻辑
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
runPulsarConsumer().catch(console.error);
graph TD subgraph Backend A[Saga Orchestrator] -- Publishes Event --> B((Pulsar Topic: saga-order-status)) end subgraph Gateway C[Pulsar Consumer] -- Subscribes --> B C -- Receives Event --> D{Message Routing} D -- UserID Match --> E[WebSocket Server] end subgraph Frontend F[React Component] -- Establishes Connection --> E E -- Pushes Event --> F F -- Updates --> G(Zustand Store) G -- Notifies --> H[UI Renders Update] end
3. 前端状态机:Zustand Store
Zustand以其极简的API和基于Hooks的模式,非常适合管理这种异步、流式的状态。我们创建一个Store来维护当前事务的状态、历史步骤和最终结果。
// store/transactionStore.js
import { create } from 'zustand';
// 定义Saga事务可能经历的所有状态
export const TransactionStatus = {
IDLE: 'IDLE',
INITIATED: 'INITIATED',
PROCESSING: 'PROCESSING',
COMPLETED: 'COMPLETED',
FAILED: 'FAILED',
};
const initialState = {
transactionId: null,
status: TransactionStatus.IDLE,
// 存储每一步的详细信息,用于展示进度
steps: [],
// 最终的错误信息
error: null,
};
export const useTransactionStore = create((set, get) => ({
...initialState,
// Action: 开始一个新的事务
startTransaction: (transactionId) => {
set({
...initialState,
transactionId,
status: TransactionStatus.INITIATED,
steps: [{ name: 'Transaction Initiated', status: 'SUCCESS', message: '请求已提交,等待后端处理...' }],
});
},
// Action: 处理从WebSocket接收到的Saga事件
handleSagaEvent: (event) => {
// 确保事件属于当前正在跟踪的事务
if (get().transactionId !== event.transactionId) {
console.warn('Received event for an untracked transaction:', event);
return;
}
set((state) => {
const newSteps = [...state.steps, { name: event.step, status: event.status, message: event.message }];
let newStatus = TransactionStatus.PROCESSING;
let newError = null;
// 根据事件更新顶层状态
if (event.step.endsWith('_COMPLETED') && event.status === 'SUCCESS') {
newStatus = TransactionStatus.COMPLETED;
} else if (event.status === 'FAILURE' || event.step.endsWith('_FAILED')) {
newStatus = TransactionStatus.FAILED;
newError = event.message || 'An unknown error occurred.';
}
return {
steps: newSteps,
status: newStatus,
error: newError,
};
});
},
// Action: 重置状态以便开始新的事务
reset: () => set(initialState),
}));
4. React组件与测试 (Jest)
组件负责连接WebSocket,并将收到的消息派发给Zustand store。UI则声明式地从store中读取状态并渲染。测试的重点在于模拟WebSocket的行为,验证组件和store是否能正确响应事件流。
React组件 (OrderTracker.jsx
):
import React, { useEffect } from 'react';
import { useTransactionStore, TransactionStatus } from './store/transactionStore';
const WEBSOCKET_URL = 'ws://localhost:8080?userId=user-abcde';
export const OrderTracker = () => {
const { status, steps, error, handleSagaEvent, startTransaction } = useTransactionStore();
useEffect(() => {
const ws = new WebSocket(WEBSOCKET_URL);
ws.onopen = () => console.log('WebSocket connected');
ws.onclose = () => console.log('WebSocket disconnected');
ws.onerror = (err) => console.error('WebSocket error:', err);
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// 调用 store action 来处理事件
handleSagaEvent(data);
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};
// 清理函数
return () => {
ws.close();
};
}, [handleSagaEvent]);
const handlePlaceOrder = () => {
// 在真实应用中,这里会发起一个API请求
// API请求成功后会返回一个transactionId
const mockTransactionId = `txn-${Date.now()}`;
startTransaction(mockTransactionId);
console.log(`Mock order placed, tracking transaction: ${mockTransactionId}`);
};
return (
<div>
<h1>Saga Transaction Tracker</h1>
<button onClick={handlePlaceOrder} disabled={status !== TransactionStatus.IDLE}>
Place New Order
</button>
<div>
<h2>Status: <span data-testid="status-display">{status}</span></h2>
{status === TransactionStatus.FAILED && <p data-testid="error-message">Error: {error}</p>}
</div>
<h3>Progress:</h3>
<ul data-testid="steps-list">
{steps.map((step, index) => (
<li key={index}>
<strong>{step.name}</strong> [{step.status}]: {step.message}
</li>
))}
</ul>
</div>
);
};
Jest测试 (OrderTracker.test.js
):
测试这种异步UI的关键在于控制WebSocket的模拟行为,并使用 @testing-library/react
的异步工具(如 waitFor
)来等待UI更新。
import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import '@testing-library/jest-dom';
import { WS } from 'jest-websocket-mock';
import { useTransactionStore } from './store/transactionStore';
import { OrderTracker } from './OrderTracker';
const WEBSOCKET_URL = 'ws://localhost:8080?userId=user-abcde';
const originalState = useTransactionStore.getState();
describe('OrderTracker', () => {
let server;
beforeEach(() => {
// 在每个测试前重置 Zustand store
useTransactionStore.setState(originalState);
// 启动模拟WebSocket服务器
server = new WS(WEBSOCKET_URL, { jsonProtocol: true });
});
afterEach(() => {
// 关闭模拟服务器
WS.clean();
});
test('should correctly process a successful saga event stream', async () => {
render(<OrderTracker />);
const MOCK_TRANSACTION_ID = 'txn-success-123';
// 模拟API调用后启动事务跟踪
// 直接调用store的action来模拟组件的启动行为
act(() => {
useTransactionStore.getState().startTransaction(MOCK_TRANSACTION_ID);
});
// 等待WebSocket连接
await server.connected;
// 初始状态应为 INITIATED
expect(screen.getByTestId('status-display')).toHaveTextContent('INITIATED');
expect(screen.getByTestId('steps-list').children.length).toBe(1);
// 模拟服务器推送第一个事件
server.send({
transactionId: MOCK_TRANSACTION_ID,
step: 'PAYMENT_PROCESSED',
status: 'SUCCESS',
message: 'Payment successful',
});
// 等待UI更新
await waitFor(() => {
expect(screen.getByTestId('status-display')).toHaveTextContent('PROCESSING');
expect(screen.getByTestId('steps-list')).toHaveTextContent('PAYMENT_PROCESSED');
expect(screen.getByTestId('steps-list').children.length).toBe(2);
});
// 模拟服务器推送最后一个完成事件
server.send({
transactionId: MOCK_TRANSACTION_ID,
step: 'ORDER_COMPLETED',
status: 'SUCCESS',
message: 'Order is complete!',
});
await waitFor(() => {
expect(screen.getByTestId('status-display')).toHaveTextContent('COMPLETED');
expect(screen.getByTestId('steps-list')).toHaveTextContent('ORDER_COMPLETED');
});
});
test('should correctly handle a failed saga event stream', async () => {
render(<OrderTracker />);
const MOCK_TRANSACTION_ID = 'txn-failed-456';
act(() => {
useTransactionStore.getState().startTransaction(MOCK_TRANSACTION_ID);
});
await server.connected;
// 模拟一个失败事件
server.send({
transactionId: MOCK_TRANSACTION_ID,
step: 'INVENTORY_DEDUCTION',
status: 'FAILURE',
message: 'Item out of stock',
});
// 验证UI是否正确显示失败状态和错误信息
await waitFor(() => {
expect(screen.getByTestId('status-display')).toHaveTextContent('FAILED');
expect(screen.getByTestId('error-message')).toHaveTextContent('Error: Item out of stock');
});
});
test('should ignore events for a different transaction', async () => {
render(<OrderTracker />);
const CURRENT_TRANSACTION_ID = 'txn-current-789';
const OTHER_TRANSACTION_ID = 'txn-other-000';
act(() => {
useTransactionStore.getState().startTransaction(CURRENT_TRANSACTION_ID);
});
await server.connected;
const initialStepCount = screen.getByTestId('steps-list').children.length;
// 发送一个不相关的事务ID事件
server.send({
transactionId: OTHER_TRANSACTION_ID,
step: 'SOME_OTHER_STEP',
status: 'SUCCESS',
message: 'This should be ignored',
});
// 等待一小段时间,确保没有状态更新
await new Promise(r => setTimeout(r, 100));
// 步骤列表不应该改变
expect(screen.getByTestId('steps-list').children.length).toBe(initialStepCount);
expect(screen.getByTestId('status-display')).toHaveTextContent('INITIATED');
});
});
架构的局限性与未来展望
这套架构解决了前端实时感知后端长事务状态的核心问题,但它并非没有缺点。
首先,WebSocket网关是一个关键节点。它必须是高可用的、可水平扩展的。管理大量持久连接对服务器资源也是一个考验。在生产环境中,需要对网关进行细致的性能调优和监控。
其次,从网关到客户端的推送本质上是“最多一次”的。如果用户在事件推送期间网络短暂中断,可能会丢失状态更新。一个改进方案是在WebSocket重连时,客户端主动向后端请求一次当前事务的最新状态,以确保数据同步。
最后,整个系统的复杂性显著增加。端到端的调试变得困难,需要依赖强大的分布式追踪系统(如OpenTelemetry),将后端的Saga流程、Pulsar消息传递、网关处理直到前端的状态更新串联起来,才能在出现问题时快速定位。这要求团队具备更高的技术驾驭能力。