一个看似简单的用户证件上传与验证流程,在后端拆分为多个微服务后,其数据一致性保障立刻变得棘手。用户通过SwiftUI客户端提交图片,请求进入系统后,需要依次触发:1) 申请记录创建服务;2) 一个耗时较长的CV(计算机视觉)分析服务,用于提取信息;3) 基于CV结果的合规校验服务。其中任何一步失败,整个流程都必须回滚,已创建的资源需要被清理。传统的两阶段提交(2PC)在這種长事务、跨服务的场景下,由于同步阻塞会导致系统可用性急剧下降,显然不是一个可行的方案。
这就是我们引入编排式(Orchestration-based)Saga模式的起点。我们需要一个中央协调器来驱动整个流程,它负责调用参与方服务,并根据每个服务的执行结果,决定是继续下一步还是执行补偿操作。这种模式下,参与方服务是无知的,它们只需提供正向操作和补偿操作的接口即可,极大地降低了业务服务的复杂度。
我们将构建的体系如下:
sequenceDiagram participant SwiftUI App participant API Gateway participant Saga Orchestrator (Spring Boot) participant Verification Service (Spring Boot) participant CV Processing Service (Spring Boot) participant Compliance Service (Spring Boot) SwiftUI App->>+API Gateway: POST /api/verifications (multipart/form-data) API Gateway->>+Saga Orchestrator: startVerificationSaga(image) Saga Orchestrator->>+Verification Service: createVerificationRecord() Verification Service-->>-Saga Orchestrator: { recordId: "xyz" } Saga Orchestrator->>+CV Processing Service: processDocument(image) [Async] CV Processing Service-->>-Saga Orchestrator: { taskId: "cv-123" } Note right of Saga Orchestrator: Saga state: PENDING_CV
Orchestrator polls or receives callback CV Processing Service->>Saga Orchestrator: Callback: /task/complete/{taskId} Saga Orchestrator->>+Compliance Service: checkCompliance(recordId) Compliance Service-->>-Saga Orchestrator: { status: "PASSED" } Note right of Saga Orchestrator: Saga state: COMPLETED Saga Orchestrator-->>-SwiftUI App: Final Status: Success
第一步:定义Saga流程与状态机
在Saga Orchestrator服务中,核心是一个状态机,它定义了整个分布式事务的生命周期。我们将用一个简单的Java enum
来表示Saga的各个阶段。
SagaDefinition.java
:
public class SagaDefinition {
// 代表Saga中的每一步
@FunctionalInterface
public interface SagaStep {
// 返回是否成功
boolean execute(SagaContext context);
}
// 补偿操作
@FunctionalInterface
public interface CompensationStep {
void compensate(SagaContext context);
}
private final List<StepDetails> steps = new ArrayList<>();
public SagaDefinition addStep(SagaStep action, CompensationStep compensation) {
steps.add(new StepDetails(action, compensation));
return this;
}
public List<StepDetails> getSteps() {
return Collections.unmodifiableList(steps);
}
// 内部类,封装正向和补偿操作
public static class StepDetails {
public final SagaStep action;
public final CompensationStep compensation;
public StepDetails(SagaStep action, CompensationStep compensation) {
this.action = action;
this.compensation = compensation;
}
}
}
SagaContext.java
则是贯穿整个流程的上下文对象,用于在各个步骤之间传递数据。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// 用于在Saga步骤间传递状态
public class SagaContext {
private final String sagaId;
private final Map<String, Object> payload = new ConcurrentHashMap<>();
public SagaContext(String sagaId) {
this.sagaId = sagaId;
}
public String getSagaId() {
return sagaId;
}
public void put(String key, Object value) {
payload.put(key, value);
}
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) payload.get(key);
}
}
第二步:构建Saga Orchestrator服务
这是我们系统的核心,它负责驱动整个流程。我们将使用Spring Boot构建。
pom.xml
关键依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- 使用 RestTemplate 进行服务间通信 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>
application.yml
:
server:
port: 8080
# 服务发现地址,在Docker Swarm中,可以直接使用服务名
services:
verification: http://verification-service:8081
cv-processing: http://cv-processing-service:8082
compliance: http://compliance-service:8083
SagaExecutor.java
是Saga的执行引擎。在真实项目中,这里会结合持久化来保证Saga实例在服务重启后能够恢复。为了演示核心逻辑,我们暂时简化为内存实现。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Stack;
@Service
public class SagaExecutor {
private static final Logger logger = LoggerFactory.getLogger(SagaExecutor.class);
public void execute(SagaDefinition saga, SagaContext context) {
Stack<SagaDefinition.StepDetails> completedSteps = new Stack<>();
for (SagaDefinition.StepDetails step : saga.getSteps()) {
try {
logger.info("Saga [{}]: Executing step...", context.getSagaId());
boolean success = step.action.execute(context);
if (success) {
logger.info("Saga [{}]: Step executed successfully.", context.getSagaId());
completedSteps.push(step);
} else {
logger.error("Saga [{}]: Step failed. Starting compensation.", context.getSagaId());
compensate(completedSteps, context);
return; // 终止Saga
}
} catch (Exception e) {
logger.error("Saga [{}] Exception during step execution. Starting compensation.", context.getSagaId(), e);
compensate(completedSteps, context);
return; // 终止Saga
}
}
logger.info("Saga [{}] completed successfully.", context.getSagaId());
}
private void compensate(Stack<SagaDefinition.StepDetails> completedSteps, SagaContext context) {
while (!completedSteps.isEmpty()) {
SagaDefinition.StepDetails stepToCompensate = completedSteps.pop();
try {
logger.warn("Saga [{}]: Compensating step...", context.getSagaId());
stepToCompensate.compensation.compensate(context);
logger.warn("Saga [{}]: Step compensated successfully.", context.getSagaId());
} catch (Exception e) {
// 补偿失败是一个严重的问题,通常需要人工介入
// 在真实项目中,这里需要记录失败日志,并触发告警
logger.error("Saga [{}] CRITICAL: Compensation failed for a step. Manual intervention required.", context.getSagaId(), e);
}
}
}
}
接下来,我们定义具体的Saga流程 VerificationSaga.java
。
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.UUID;
@Component
public class VerificationSaga {
private static final Logger logger = LoggerFactory.getLogger(VerificationSaga.class);
private final RestTemplate restTemplate;
private final String verificationServiceUrl;
private final String cvProcessingServiceUrl;
private final String complianceServiceUrl;
public VerificationSaga(RestTemplate restTemplate,
@Value("${services.verification}") String verificationServiceUrl,
@Value("${services.cv-processing}") String cvProcessingServiceUrl,
@Value("${services.compliance}") String complianceServiceUrl) {
this.restTemplate = restTemplate;
this.verificationServiceUrl = verificationServiceUrl;
this.cvProcessingServiceUrl = cvProcessingServiceUrl;
this.complianceServiceUrl = complianceServiceUrl;
}
public SagaDefinition build(byte[] document) {
return new SagaDefinition()
.addStep(
// Step 1: Create Verification Record
context -> {
logger.info("Saga [{}]: Step 1 - Creating verification record.", context.getSagaId());
String url = verificationServiceUrl + "/verifications";
HttpEntity<String> request = new HttpEntity<>("{}");
try {
ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
if (response.getStatusCode() == HttpStatus.CREATED && response.getBody() != null) {
String recordId = (String) response.getBody().get("id");
context.put("recordId", recordId);
return true;
}
} catch (Exception e) {
logger.error("Error calling verification service", e);
}
return false;
},
// Compensation for Step 1
context -> {
String recordId = context.get("recordId");
if (recordId != null) {
logger.warn("Saga [{}]: Compensating - Deleting verification record {}.", context.getSagaId(), recordId);
String url = verificationServiceUrl + "/verifications/" + recordId;
restTemplate.delete(url);
}
}
)
.addStep(
// Step 2: Process Document with CV (long-running)
// 这里的坑在于:这是一个长事务,同步调用会阻塞Saga执行器。
// 在生产环境中,应该采用异步回调或轮询。为简化,这里我们用同步模拟,但设置一个较短的超时。
context -> {
String recordId = context.get("recordId");
logger.info("Saga [{}]: Step 2 - Starting CV processing for record {}.", context.getSagaId(), recordId);
String url = cvProcessingServiceUrl + "/process";
// 实际项目中,这里会传递图片数据
HttpEntity<String> request = new HttpEntity<>("{\"recordId\": \"" + recordId + "\"}");
try {
ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
if (response.getStatusCode() == HttpStatus.OK && "SUCCESS".equals(response.getBody().get("status"))) {
context.put("cvResult", response.getBody().get("data"));
return true;
}
} catch (Exception e) {
logger.error("Error calling CV processing service", e);
}
return false;
},
// Compensation for Step 2
// CV处理的补偿可能很复杂,比如删除中间产物。这里我们简化为打印日志。
context -> {
String recordId = context.get("recordId");
logger.warn("Saga [{}]: Compensating - CV processing for record {} failed. No direct resource to delete.", context.getSagaId(), recordId);
}
)
.addStep(
// Step 3: Check Compliance
context -> {
String recordId = context.get("recordId");
logger.info("Saga [{}]: Step 3 - Checking compliance for record {}.", context.getSagaId(), recordId);
String url = complianceServiceUrl + "/check";
HttpEntity<String> request = new HttpEntity<>("{\"recordId\": \"" + recordId + "\"}");
try {
ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
return response.getStatusCode() == HttpStatus.OK && "PASSED".equals(response.getBody().get("result"));
} catch (Exception e) {
logger.error("Error calling compliance service", e);
}
return false;
},
// Compensation for Step 3
context -> {
String recordId = context.get("recordId");
logger.warn("Saga [{}]: Compensating - Compliance check for record {} failed. No compensation action needed.", context.getSagaId(), recordId);
}
);
}
}
第三步:构建参与方服务
这些是简单的Spring Boot应用,每个都在不同的端口上运行。
Verification Service (port 8081)
@RestController
@RequestMapping("/verifications")
public class VerificationController {
private final Map<String, String> records = new ConcurrentHashMap<>();
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Map<String, String> create() {
String id = UUID.randomUUID().toString();
records.put(id, "PENDING");
System.out.println("Created verification record: " + id);
return Map.of("id", id);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public void delete(@PathVariable String id) {
records.remove(id);
System.out.println("Compensated: Deleted verification record: " + id);
}
}
CV Processing Service (port 8082)
这个服务模拟一个耗时且可能失败的操作。
@RestController
public class CvController {
@PostMapping("/process")
public Map<String, Object> process() throws InterruptedException {
System.out.println("CV Service: Received processing request...");
// 模拟耗时操作
Thread.sleep(2000);
// 模拟随机失败
if (new Random().nextInt(10) < 3) { // 30%的失败率
System.err.println("CV Service: Processing failed!");
return Map.of("status", "FAILED");
}
System.out.println("CV Service: Processing successful.");
return Map.of("status", "SUCCESS", "data", "Extracted text...");
}
}
Compliance Service (port 8083)
@RestController
public class ComplianceController {
@PostMapping("/check")
public Map<String, String> check() {
// 同样模拟失败的可能性
if (new Random().nextBoolean()) {
System.out.println("Compliance Service: Check PASSED");
return Map.of("result", "PASSED");
} else {
System.err.println("Compliance Service: Check FAILED");
return Map.of("result", "FAILED");
}
}
}
第四步:SwiftUI客户端实现
客户端负责上传文件并发起Saga流程。由于后端是长事务,客户端不应同步等待结果,而是采用一种异步获取状态的机制。
import SwiftUI
struct VerificationView: View {
@State private var statusMessage = "Ready to upload document"
@State private var isProcessing = false
// 你的Orchestrator服务的URL
let orchestratorURL = URL(string: "http://localhost:8080/verifications")!
var body: some View {
VStack(spacing: 20) {
Text("Document Verification")
.font(.largeTitle)
Text(statusMessage)
.font(.body)
.foregroundColor(isProcessing ? .blue : .gray)
Button(action: startVerificationProcess) {
Text("Upload and Verify")
.padding()
.background(isProcessing ? Color.gray : Color.blue)
.foregroundColor(.white)
.cornerRadius(10)
}
.disabled(isProcessing)
}
.padding()
}
func startVerificationProcess() {
self.isProcessing = true
self.statusMessage = "Uploading document..."
// 在真实应用中,你会从UIImagePickerController或文件中获取数据
let dummyImageData = "dummy-image-data".data(using: .utf8)!
var request = URLRequest(url: orchestratorURL)
request.httpMethod = "POST"
// multipart/form-data的实现较为复杂,这里为了演示核心流程,简化为普通POST
// 生产级代码需要构建一个正确的multipart body
request.setValue("application/octet-stream", forHTTPHeaderField: "Content-Type")
request.httpBody = dummyImageData
URLSession.shared.dataTask(with: request) { data, response, error in
DispatchQueue.main.async {
self.isProcessing = false
if let error = error {
self.statusMessage = "Error: \(error.localizedDescription)"
return
}
guard let httpResponse = response as? HTTPURLResponse else {
self.statusMessage = "Invalid response from server"
return
}
// Saga是异步执行的,后端立即返回202 Accepted是更好的实践
// 但在我们当前的同步Saga实现中,它会等待完成
if (200...299).contains(httpResponse.statusCode) {
self.statusMessage = "Verification process completed successfully!"
} else {
// Saga流程中某个步骤失败并已补偿
self.statusMessage = "Verification failed. Please try again. (Status code: \(httpResponse.statusCode))"
}
}
}.resume()
}
}
这里的单元测试思路:
- SagaExecutor单元测试: 构造一个包含多个步骤的
SagaDefinition
,模拟其中某个action
抛出异常或返回false
,断言compensate
方法被正确调用,并且调用顺序是相反的。 - VerificationSaga集成测试: 使用
MockRestServiceServer
来模拟对其他服务的HTTP调用。测试当所有服务都返回成功时,Saga流程能完整执行。测试当CV服务返回失败时,创建记录服务的补偿接口被调用。
第五步:使用Docker Swarm部署
Docker Swarm提供了一种轻量级的容器编排方案。我们创建一个 docker-compose.yml
文件来定义我们的服务栈。
docker-compose.yml
:
version: '3.8'
services:
orchestrator:
image: my-saga-orchestrator:latest # 你需要先构建这个镜像
ports:
- "8080:8080"
networks:
- saga-net
deploy:
replicas: 2
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
verification-service:
image: my-verification-service:latest
networks:
- saga-net
deploy:
replicas: 1
cv-processing-service:
image: my-cv-processing-service:latest
networks:
- saga-net
deploy:
replicas: 1
# 在真实场景中,CV服务可能是资源密集型的,可以约束它部署在特定节点
# placement:
# constraints:
# - "node.labels.has_gpu == true"
compliance-service:
image: my-compliance-service:latest
networks:
- saga-net
deploy:
replicas: 1
networks:
saga-net:
driver: overlay
每个服务的Dockerfile
都很标准:
FROM openjdk:17-slim
WORKDIR /app
COPY target/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]
构建和部署流程:
- 为每个Spring Boot应用构建JAR包:
mvn clean package
- 为每个应用构建Docker镜像:
docker build -t my-saga-orchestrator:latest .
(在各自的模块目录下执行) - 初始化Swarm集群:
docker swarm init
(在管理节点) - 部署服务栈:
docker stack deploy -c docker-compose.yml saga-stack
部署后,Swarm的内置DNS会让服务之间通过服务名(例如 http://verification-service:8081
)直接通信,这正是我们在application.yml
中配置的方式。
当前方案的局限性与优化路径
这个实现虽然展示了编排式Saga的核心思想,但在生产环境中还存在一些不足。
首先,Saga执行器是同步阻塞的,尤其是在等待长耗时的CV任务时。这会占用执行器线程,降低系统吞吐量。一个更健壮的实现应该将Saga状态持久化到数据库中,并在每一步执行后更新状态。对于异步任务,执行器可以释放线程,通过回调或轮询来驱动Saga进入下一个状态。
其次,服务间的通信完全依赖同步的REST调用。如果某个服务暂时不可用,会导致整个Saga立即失败并回滚。引入消息队列(如RabbitMQ或Kafka)进行服务间的通信,可以实现解耦和更高的韧性。Saga的步骤可以由消息来驱动,这更符合事件驱动架构的理念,也即所谓的“编舞式Saga”(Choreography-based Saga)。
最后,我们的补偿逻辑非常简单。在真实世界中,补偿操作本身也可能失败。因此,补偿操作需要被设计成幂等的,并且要有重试机制。如果补偿持续失败,必须有告警机制通知人工介入处理,以避免数据处于不一致的中间状态。Docker Swarm虽然简化了部署,但相比Kubernetes,其在可观测性、自动扩缩容和生态系统方面的功能相对有限,对于更复杂的微服务系统,可能需要考虑迁移到功能更全面的平台上。