利用Zustand与Jest为基于消息队列的Saga分布式事务构建高可靠前端状态机


一个复杂业务流程的提交,比如包含主订单、库存扣减、积分增减的多服务协同操作,其后台处理绝不可能是瞬时完成的原子操作。在微服务架构下,这通常是一个由多个独立事务组成的分布式事务。如果采用传统的HTTP轮询方式来同步前端状态,不仅会造成服务端和网络的巨大浪费,更无法提供流畅、实时的用户体验。前端界面会长时间停留在“处理中”,用户无法得知事务执行到了哪一步,也无法在失败时获得清晰的上下文。

我们的技术挑战明确:为后端一个异步、长时运行的Saga分布式事务,在前端构建一个能够实时、精确、可靠地反映其完整生命周期的状态机,并且这个状态机本身必须是健壮的、可测试的。

定义问题:前端状态与后端事件的鸿沟

在后端,Saga模式通过一系列的本地事务和补偿操作来保证最终一致性。每个步骤的成功或失败都会以事件的形式发布到消息队列中。例如,一个订单流程可能产生如下事件流:ORDER_CREATED -> PAYMENT_PROCESSED -> INVENTORY_DEDUCTED -> POINTS_CREDITED -> ORDER_COMPLETED。如果任何一步失败,则会触发补偿事件流:POINTS_DEDUCTION_FAILED -> START_POINTS_COMPENSATION -> INVENTORY_RESTOCK_STARTED -> ORDER_FAILED

前端的挑战在于,如何跨越HTTP的瞬时性,订阅这个事件流,并将其转化为一个可预测的UI状态模型。用户不应该看到一连串杂乱的通知,而应该看到一个清晰的状态演进:正在处理 -> 正在支付 -> 正在锁定库存 -> 订单完成

架构决策:消息系统选型 NATS vs. Pulsar

要实现实时推送,后端事件必须通过一个中间件推送到前端。最常见的模式是后端服务 -> 消息队列 -> 网关服务 (WebSocket) -> 前端。这里的核心是消息队列的选择。

方案A: NATS JetStream

NATS以其极简和高性能著称。其核心是一个轻量级的发布/订阅系统。引入JetStream后,它获得了持久化、至少一次/精确一次送达保证等关键能力,使其能够胜任可靠消息传递的场景。

  • 优势:

    • 性能: 极高吞吐量,极低延迟。对于需要快速响应的事件通知非常理想。
    • 简单: 部署和运维相对简单,客户端API直观易用。
    • Stream/Consumer模型: JetStream的Stream可以持久化存储消息,Consumer可以按需消费,支持ACK机制,满足Saga事件的可靠传递。
  • 劣势:

    • 功能相对基础: 对于复杂的Saga场景,可能会缺少一些高级特性。例如,Saga模式中一个常见的需求是“超时触发补偿”,这需要消息队列具备延迟消息的能力。虽然可以通过在应用层实现定时器来模拟,但这增加了业务代码的复杂性。

方案B: Apache Pulsar

Pulsar在设计上是一个云原生的分布式消息平台,其分层架构(Broker + BookKeeper)带来了许多高级特性。

  • 优势:

    • 统一消息模型: 同时支持队列(Queue)和流(Stream)两种模式,能灵活适应不同消费场景。
    • 内置延迟消息: 这是Pulsar的一个杀手级特性。可以直接向Pulsar发送一个消息,并指定它在一段时间后才对消费者可见。这对于实现Saga的超时机制是天然的、可靠的解决方案,极大地简化了编排器的设计。
    • 多租户与分区: 内置的逻辑隔离(租户、命名空间)和物理扩展能力(分区),对于大型企业级系统非常友好。
  • 劣劣势:

    • 运维复杂性: 相较于NATS,Pulsar的组件(ZooKeeper/etcd, BookKeeper, Broker, Proxy)更多,部署和维护的成本更高。

最终选择:Pulsar

在真实项目中,分布式事务的可靠性压倒一切。一个Saga流程可能因为下游服务暂时不可用而需要等待和重试,也可能因为业务超时而必须触发补偿。Pulsar内置的延迟消息功能,让我们能够以一种非常优雅和健壮的方式来处理这类超时场景,而无需在业务代码中维护复杂的状态和定时器。因此,尽管运维成本更高,但为了架构的长期健壮性和简洁性,我们选择Pulsar作为Saga事件的载体。

核心实现:从后端事件到前端状态

我们的系统由四部分组成:Saga编排器(Go)、Pulsar集群、WebSocket网关(Node.js)和前端(React + Zustand)。

1. 后端Saga编排器与Pulsar生产者 (Go)

Saga的每一步都会向Pulsar发送一个结构化的事件。这个事件不仅包含状态,还包含事务ID、用户ID以及其他上下文信息,以便网关服务进行路由。

package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

// SagaEvent 定义了我们系统中事件的统一结构
type SagaEvent struct {
	TransactionID string `json:"transactionId"`
	UserID        string `json:"userId"`
	Step          string `json:"step"`      // e.g., "PAYMENT_PROCESSING", "INVENTORY_RESERVED"
	Status        string `json:"status"`    // "SUCCESS", "FAILURE", "COMPENSATING"
	Message       string `json:"message"`   // User-friendly message
	Timestamp     int64  `json:"timestamp"`
}

func main() {
	// 初始化 Pulsar Client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}
	defer client.Close()

	// 为订单状态更新创建一个生产者
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "saga-order-status",
	})
	if err != nil {
		log.Fatalf("Could not create producer: %v", err)
	}
	defer producer.Close()

	// --- 模拟Saga流程 ---
	transactionID := "txn-12345"
	userID := "user-abcde"

	// 步骤1: 支付处理中
	publishSagaEvent(producer, transactionID, userID, "PAYMENT_PROCESSING", "SUCCESS", "支付渠道已受理,处理中...")

	// 模拟耗时操作
	time.Sleep(2 * time.Second)

	// 步骤2: 库存锁定成功
	publishSagaEvent(producer, transactionID, userID, "INVENTORY_RESERVED", "SUCCESS", "商品库存已锁定")
    
    // 模拟耗时操作
	time.Sleep(1 * time.Second)

	// 步骤3: 订单完成
	publishSagaEvent(producer, transactionID, userID, "ORDER_COMPLETED", "SUCCESS", "订单已成功处理!")

    // --- 模拟一个失败并补偿的Saga ---
    failedTxnID := "txn-67890"
    publishSagaEvent(producer, transactionID, userID, "PAYMENT_PROCESSING", "SUCCESS", "支付渠道已受理,处理中...")
    time.Sleep(2 * time.Second)
    publishSagaEvent(producer, failedTxnID, userID, "INVENTORY_DEDUCTION", "FAILURE", "库存不足,操作失败")
    time.Sleep(1 * time.Second)
    publishSagaEvent(producer, failedTxnID, userID, "PAYMENT_COMPENSATING", "SUCCESS", "正在为您退款...")

}

func publishSagaEvent(producer pulsar.Producer, txnID, userID, step, status, message string) {
	event := SagaEvent{
		TransactionID: txnID,
		UserID:        userID,
		Step:          step,
		Status:        status,
		Message:       message,
		Timestamp:     time.Now().UnixMilli(),
	}

	payload, err := json.Marshal(event)
	if err != nil {
		log.Printf("Failed to marshal event: %v", err)
		return
	}

	// 发送消息,使用 UserID 作为 key,可以保证同一用户的所有事件都进入同一个Pulsar分区
	// 这对于需要严格顺序消费的场景非常重要
	_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
		Payload: payload,
		Key:     userID,
	})

	if err != nil {
		log.Fatalf("Failed to publish message: %v", err)
	}
	log.Printf("Published event: %s - %s", step, status)
}

2. WebSocket网关:Pulsar消费者与消息推送 (Node.js)

网关服务是连接后端和前端的桥梁。它消费Pulsar中的事件,并通过WebSocket将消息实时推送给对应的客户端。

// server.js
import { WebSocketServer } from 'ws';
import Pulsar from 'pulsar-client';

const wss = new WebSocketServer({ port: 8080 });
// 简单的连接管理,生产环境需要更健壮的方案,如Redis
const clients = new Map();

wss.on('connection', (ws, req) => {
  // 在真实场景中,这里会通过JWT等方式进行用户认证
  // 并从认证信息中获取 userID
  const urlParams = new URLSearchParams(req.url.slice(1));
  const userId = urlParams.get('userId');
  if (!userId) {
    ws.close(1008, "User ID is required");
    return;
  }

  console.log(`Client connected: ${userId}`);
  
  // 存储客户端连接,以便后续推送消息
  if (!clients.has(userId)) {
    clients.set(userId, []);
  }
  clients.get(userId).push(ws);

  ws.on('close', () => {
    console.log(`Client disconnected: ${userId}`);
    const userConnections = clients.get(userId);
    if (userConnections) {
      const index = userConnections.indexOf(ws);
      if (index > -1) {
        userConnections.splice(index, 1);
      }
      if (userConnections.length === 0) {
        clients.delete(userId);
      }
    }
  });

  ws.on('error', console.error);
});

async function runPulsarConsumer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const consumer = await client.subscribe({
    topic: 'persistent://public/default/saga-order-status',
    subscription: 'websocket-gateway-subscription',
    subscriptionType: 'Shared', // 使用共享订阅,允许多个网关实例并行消费
    ackTimeoutMs: 10000,
  });

  console.log('Pulsar consumer started...');

  while (true) {
    try {
      const msg = await consumer.receive();
      const payload = msg.getData().toString();
      console.log(`Received message: ${payload}`);

      const event = JSON.parse(payload);
      const { userId } = event;

      if (clients.has(userId)) {
        // 将消息推送给该用户的所有活跃连接
        const userConnections = clients.get(userId);
        userConnections.forEach(ws => {
          if (ws.readyState === ws.OPEN) {
            ws.send(JSON.stringify(event));
          }
        });
      }
      
      await consumer.acknowledge(msg);
    } catch (error) {
      console.error('Failed to receive message:', error);
      // 在生产环境中,这里需要更完善的错误处理和重连逻辑
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

runPulsarConsumer().catch(console.error);
graph TD
    subgraph Backend
        A[Saga Orchestrator] -- Publishes Event --> B((Pulsar Topic: saga-order-status))
    end
    subgraph Gateway
        C[Pulsar Consumer] -- Subscribes --> B
        C -- Receives Event --> D{Message Routing}
        D -- UserID Match --> E[WebSocket Server]
    end
    subgraph Frontend
        F[React Component] -- Establishes Connection --> E
        E -- Pushes Event --> F
        F -- Updates --> G(Zustand Store)
        G -- Notifies --> H[UI Renders Update]
    end

3. 前端状态机:Zustand Store

Zustand以其极简的API和基于Hooks的模式,非常适合管理这种异步、流式的状态。我们创建一个Store来维护当前事务的状态、历史步骤和最终结果。

// store/transactionStore.js
import { create } from 'zustand';

// 定义Saga事务可能经历的所有状态
export const TransactionStatus = {
  IDLE: 'IDLE',
  INITIATED: 'INITIATED',
  PROCESSING: 'PROCESSING',
  COMPLETED: 'COMPLETED',
  FAILED: 'FAILED',
};

const initialState = {
  transactionId: null,
  status: TransactionStatus.IDLE,
  // 存储每一步的详细信息,用于展示进度
  steps: [],
  // 最终的错误信息
  error: null,
};

export const useTransactionStore = create((set, get) => ({
  ...initialState,
  
  // Action: 开始一个新的事务
  startTransaction: (transactionId) => {
    set({
      ...initialState,
      transactionId,
      status: TransactionStatus.INITIATED,
      steps: [{ name: 'Transaction Initiated', status: 'SUCCESS', message: '请求已提交,等待后端处理...' }],
    });
  },

  // Action: 处理从WebSocket接收到的Saga事件
  handleSagaEvent: (event) => {
    // 确保事件属于当前正在跟踪的事务
    if (get().transactionId !== event.transactionId) {
      console.warn('Received event for an untracked transaction:', event);
      return;
    }

    set((state) => {
      const newSteps = [...state.steps, { name: event.step, status: event.status, message: event.message }];
      let newStatus = TransactionStatus.PROCESSING;
      let newError = null;

      // 根据事件更新顶层状态
      if (event.step.endsWith('_COMPLETED') && event.status === 'SUCCESS') {
        newStatus = TransactionStatus.COMPLETED;
      } else if (event.status === 'FAILURE' || event.step.endsWith('_FAILED')) {
        newStatus = TransactionStatus.FAILED;
        newError = event.message || 'An unknown error occurred.';
      }

      return {
        steps: newSteps,
        status: newStatus,
        error: newError,
      };
    });
  },

  // Action: 重置状态以便开始新的事务
  reset: () => set(initialState),
}));

4. React组件与测试 (Jest)

组件负责连接WebSocket,并将收到的消息派发给Zustand store。UI则声明式地从store中读取状态并渲染。测试的重点在于模拟WebSocket的行为,验证组件和store是否能正确响应事件流。

React组件 (OrderTracker.jsx):

import React, { useEffect } from 'react';
import { useTransactionStore, TransactionStatus } from './store/transactionStore';

const WEBSOCKET_URL = 'ws://localhost:8080?userId=user-abcde';

export const OrderTracker = () => {
  const { status, steps, error, handleSagaEvent, startTransaction } = useTransactionStore();

  useEffect(() => {
    const ws = new WebSocket(WEBSOCKET_URL);

    ws.onopen = () => console.log('WebSocket connected');
    ws.onclose = () => console.log('WebSocket disconnected');
    ws.onerror = (err) => console.error('WebSocket error:', err);

    ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        // 调用 store action 来处理事件
        handleSagaEvent(data);
      } catch (e) {
        console.error('Failed to parse WebSocket message:', e);
      }
    };

    // 清理函数
    return () => {
      ws.close();
    };
  }, [handleSagaEvent]);

  const handlePlaceOrder = () => {
    // 在真实应用中,这里会发起一个API请求
    // API请求成功后会返回一个transactionId
    const mockTransactionId = `txn-${Date.now()}`;
    startTransaction(mockTransactionId);
    console.log(`Mock order placed, tracking transaction: ${mockTransactionId}`);
  };

  return (
    <div>
      <h1>Saga Transaction Tracker</h1>
      <button onClick={handlePlaceOrder} disabled={status !== TransactionStatus.IDLE}>
        Place New Order
      </button>
      
      <div>
        <h2>Status: <span data-testid="status-display">{status}</span></h2>
        {status === TransactionStatus.FAILED && <p data-testid="error-message">Error: {error}</p>}
      </div>

      <h3>Progress:</h3>
      <ul data-testid="steps-list">
        {steps.map((step, index) => (
          <li key={index}>
            <strong>{step.name}</strong> [{step.status}]: {step.message}
          </li>
        ))}
      </ul>
    </div>
  );
};

Jest测试 (OrderTracker.test.js):

测试这种异步UI的关键在于控制WebSocket的模拟行为,并使用 @testing-library/react 的异步工具(如 waitFor)来等待UI更新。

import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import '@testing-library/jest-dom';
import { WS } from 'jest-websocket-mock';
import { useTransactionStore } from './store/transactionStore';
import { OrderTracker } from './OrderTracker';

const WEBSOCKET_URL = 'ws://localhost:8080?userId=user-abcde';
const originalState = useTransactionStore.getState();

describe('OrderTracker', () => {
  let server;

  beforeEach(() => {
    // 在每个测试前重置 Zustand store
    useTransactionStore.setState(originalState);
    // 启动模拟WebSocket服务器
    server = new WS(WEBSOCKET_URL, { jsonProtocol: true });
  });

  afterEach(() => {
    // 关闭模拟服务器
    WS.clean();
  });

  test('should correctly process a successful saga event stream', async () => {
    render(<OrderTracker />);
    const MOCK_TRANSACTION_ID = 'txn-success-123';

    // 模拟API调用后启动事务跟踪
    // 直接调用store的action来模拟组件的启动行为
    act(() => {
        useTransactionStore.getState().startTransaction(MOCK_TRANSACTION_ID);
    });
    
    // 等待WebSocket连接
    await server.connected;

    // 初始状态应为 INITIATED
    expect(screen.getByTestId('status-display')).toHaveTextContent('INITIATED');
    expect(screen.getByTestId('steps-list').children.length).toBe(1);

    // 模拟服务器推送第一个事件
    server.send({
      transactionId: MOCK_TRANSACTION_ID,
      step: 'PAYMENT_PROCESSED',
      status: 'SUCCESS',
      message: 'Payment successful',
    });

    // 等待UI更新
    await waitFor(() => {
      expect(screen.getByTestId('status-display')).toHaveTextContent('PROCESSING');
      expect(screen.getByTestId('steps-list')).toHaveTextContent('PAYMENT_PROCESSED');
      expect(screen.getByTestId('steps-list').children.length).toBe(2);
    });

    // 模拟服务器推送最后一个完成事件
    server.send({
      transactionId: MOCK_TRANSACTION_ID,
      step: 'ORDER_COMPLETED',
      status: 'SUCCESS',
      message: 'Order is complete!',
    });

    await waitFor(() => {
      expect(screen.getByTestId('status-display')).toHaveTextContent('COMPLETED');
      expect(screen.getByTestId('steps-list')).toHaveTextContent('ORDER_COMPLETED');
    });
  });

  test('should correctly handle a failed saga event stream', async () => {
    render(<OrderTracker />);
    const MOCK_TRANSACTION_ID = 'txn-failed-456';
    
    act(() => {
        useTransactionStore.getState().startTransaction(MOCK_TRANSACTION_ID);
    });

    await server.connected;

    // 模拟一个失败事件
    server.send({
      transactionId: MOCK_TRANSACTION_ID,
      step: 'INVENTORY_DEDUCTION',
      status: 'FAILURE',
      message: 'Item out of stock',
    });

    // 验证UI是否正确显示失败状态和错误信息
    await waitFor(() => {
      expect(screen.getByTestId('status-display')).toHaveTextContent('FAILED');
      expect(screen.getByTestId('error-message')).toHaveTextContent('Error: Item out of stock');
    });
  });

  test('should ignore events for a different transaction', async () => {
     render(<OrderTracker />);
    const CURRENT_TRANSACTION_ID = 'txn-current-789';
    const OTHER_TRANSACTION_ID = 'txn-other-000';
    
    act(() => {
        useTransactionStore.getState().startTransaction(CURRENT_TRANSACTION_ID);
    });
    
    await server.connected;

    const initialStepCount = screen.getByTestId('steps-list').children.length;

    // 发送一个不相关的事务ID事件
    server.send({
      transactionId: OTHER_TRANSACTION_ID,
      step: 'SOME_OTHER_STEP',
      status: 'SUCCESS',
      message: 'This should be ignored',
    });
    
    // 等待一小段时间,确保没有状态更新
    await new Promise(r => setTimeout(r, 100));

    // 步骤列表不应该改变
    expect(screen.getByTestId('steps-list').children.length).toBe(initialStepCount);
    expect(screen.getByTestId('status-display')).toHaveTextContent('INITIATED');
  });

});

架构的局限性与未来展望

这套架构解决了前端实时感知后端长事务状态的核心问题,但它并非没有缺点。

首先,WebSocket网关是一个关键节点。它必须是高可用的、可水平扩展的。管理大量持久连接对服务器资源也是一个考验。在生产环境中,需要对网关进行细致的性能调优和监控。

其次,从网关到客户端的推送本质上是“最多一次”的。如果用户在事件推送期间网络短暂中断,可能会丢失状态更新。一个改进方案是在WebSocket重连时,客户端主动向后端请求一次当前事务的最新状态,以确保数据同步。

最后,整个系统的复杂性显著增加。端到端的调试变得困难,需要依赖强大的分布式追踪系统(如OpenTelemetry),将后端的Saga流程、Pulsar消息传递、网关处理直到前端的状态更新串联起来,才能在出现问题时快速定位。这要求团队具备更高的技术驾驭能力。


  目录