构建基于编排式Saga的分布式CV处理流:从SwiftUI请求到Docker Swarm部署


一个看似简单的用户证件上传与验证流程,在后端拆分为多个微服务后,其数据一致性保障立刻变得棘手。用户通过SwiftUI客户端提交图片,请求进入系统后,需要依次触发:1) 申请记录创建服务;2) 一个耗时较长的CV(计算机视觉)分析服务,用于提取信息;3) 基于CV结果的合规校验服务。其中任何一步失败,整个流程都必须回滚,已创建的资源需要被清理。传统的两阶段提交(2PC)在這種长事务、跨服务的场景下,由于同步阻塞会导致系统可用性急剧下降,显然不是一个可行的方案。

这就是我们引入编排式(Orchestration-based)Saga模式的起点。我们需要一个中央协调器来驱动整个流程,它负责调用参与方服务,并根据每个服务的执行结果,决定是继续下一步还是执行补偿操作。这种模式下,参与方服务是无知的,它们只需提供正向操作和补偿操作的接口即可,极大地降低了业务服务的复杂度。

我们将构建的体系如下:

sequenceDiagram
    participant SwiftUI App
    participant API Gateway
    participant Saga Orchestrator (Spring Boot)
    participant Verification Service (Spring Boot)
    participant CV Processing Service (Spring Boot)
    participant Compliance Service (Spring Boot)

    SwiftUI App->>+API Gateway: POST /api/verifications (multipart/form-data)
    API Gateway->>+Saga Orchestrator: startVerificationSaga(image)
    Saga Orchestrator->>+Verification Service: createVerificationRecord()
    Verification Service-->>-Saga Orchestrator: { recordId: "xyz" }
    Saga Orchestrator->>+CV Processing Service: processDocument(image) [Async]
    CV Processing Service-->>-Saga Orchestrator: { taskId: "cv-123" }
    Note right of Saga Orchestrator: Saga state: PENDING_CV 
Orchestrator polls or receives callback CV Processing Service->>Saga Orchestrator: Callback: /task/complete/{taskId} Saga Orchestrator->>+Compliance Service: checkCompliance(recordId) Compliance Service-->>-Saga Orchestrator: { status: "PASSED" } Note right of Saga Orchestrator: Saga state: COMPLETED Saga Orchestrator-->>-SwiftUI App: Final Status: Success

第一步:定义Saga流程与状态机

在Saga Orchestrator服务中,核心是一个状态机,它定义了整个分布式事务的生命周期。我们将用一个简单的Java enum 来表示Saga的各个阶段。

SagaDefinition.java:

public class SagaDefinition {

    // 代表Saga中的每一步
    @FunctionalInterface
    public interface SagaStep {
        // 返回是否成功
        boolean execute(SagaContext context);
    }
    
    // 补偿操作
    @FunctionalInterface
    public interface CompensationStep {
        void compensate(SagaContext context);
    }
    
    private final List<StepDetails> steps = new ArrayList<>();

    public SagaDefinition addStep(SagaStep action, CompensationStep compensation) {
        steps.add(new StepDetails(action, compensation));
        return this;
    }

    public List<StepDetails> getSteps() {
        return Collections.unmodifiableList(steps);
    }
    
    // 内部类,封装正向和补偿操作
    public static class StepDetails {
        public final SagaStep action;
        public final CompensationStep compensation;

        public StepDetails(SagaStep action, CompensationStep compensation) {
            this.action = action;
            this.compensation = compensation;
        }
    }
}

SagaContext.java 则是贯穿整个流程的上下文对象,用于在各个步骤之间传递数据。

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// 用于在Saga步骤间传递状态
public class SagaContext {
    private final String sagaId;
    private final Map<String, Object> payload = new ConcurrentHashMap<>();

    public SagaContext(String sagaId) {
        this.sagaId = sagaId;
    }

    public String getSagaId() {
        return sagaId;
    }

    public void put(String key, Object value) {
        payload.put(key, value);
    }

    @SuppressWarnings("unchecked")
    public <T> T get(String key) {
        return (T) payload.get(key);
    }
}

第二步:构建Saga Orchestrator服务

这是我们系统的核心,它负责驱动整个流程。我们将使用Spring Boot构建。

pom.xml 关键依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- 使用 RestTemplate 进行服务间通信 -->
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.13</version>
    </dependency>
</dependencies>

application.yml:

server:
  port: 8080

# 服务发现地址,在Docker Swarm中,可以直接使用服务名
services:
  verification: http://verification-service:8081
  cv-processing: http://cv-processing-service:8082
  compliance: http://compliance-service:8083

SagaExecutor.java 是Saga的执行引擎。在真实项目中,这里会结合持久化来保证Saga实例在服务重启后能够恢复。为了演示核心逻辑,我们暂时简化为内存实现。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Stack;

@Service
public class SagaExecutor {

    private static final Logger logger = LoggerFactory.getLogger(SagaExecutor.class);

    public void execute(SagaDefinition saga, SagaContext context) {
        Stack<SagaDefinition.StepDetails> completedSteps = new Stack<>();
        
        for (SagaDefinition.StepDetails step : saga.getSteps()) {
            try {
                logger.info("Saga [{}]: Executing step...", context.getSagaId());
                boolean success = step.action.execute(context);
                if (success) {
                    logger.info("Saga [{}]: Step executed successfully.", context.getSagaId());
                    completedSteps.push(step);
                } else {
                    logger.error("Saga [{}]: Step failed. Starting compensation.", context.getSagaId());
                    compensate(completedSteps, context);
                    return; // 终止Saga
                }
            } catch (Exception e) {
                logger.error("Saga [{}] Exception during step execution. Starting compensation.", context.getSagaId(), e);
                compensate(completedSteps, context);
                return; // 终止Saga
            }
        }
        logger.info("Saga [{}] completed successfully.", context.getSagaId());
    }

    private void compensate(Stack<SagaDefinition.StepDetails> completedSteps, SagaContext context) {
        while (!completedSteps.isEmpty()) {
            SagaDefinition.StepDetails stepToCompensate = completedSteps.pop();
            try {
                logger.warn("Saga [{}]: Compensating step...", context.getSagaId());
                stepToCompensate.compensation.compensate(context);
                logger.warn("Saga [{}]: Step compensated successfully.", context.getSagaId());
            } catch (Exception e) {
                // 补偿失败是一个严重的问题,通常需要人工介入
                // 在真实项目中,这里需要记录失败日志,并触发告警
                logger.error("Saga [{}] CRITICAL: Compensation failed for a step. Manual intervention required.", context.getSagaId(), e);
            }
        }
    }
}

接下来,我们定义具体的Saga流程 VerificationSaga.java

import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.UUID;

@Component
public class VerificationSaga {

    private static final Logger logger = LoggerFactory.getLogger(VerificationSaga.class);

    private final RestTemplate restTemplate;
    private final String verificationServiceUrl;
    private final String cvProcessingServiceUrl;
    private final String complianceServiceUrl;

    public VerificationSaga(RestTemplate restTemplate,
                            @Value("${services.verification}") String verificationServiceUrl,
                            @Value("${services.cv-processing}") String cvProcessingServiceUrl,
                            @Value("${services.compliance}") String complianceServiceUrl) {
        this.restTemplate = restTemplate;
        this.verificationServiceUrl = verificationServiceUrl;
        this.cvProcessingServiceUrl = cvProcessingServiceUrl;
        this.complianceServiceUrl = complianceServiceUrl;
    }

    public SagaDefinition build(byte[] document) {
        return new SagaDefinition()
            .addStep(
                // Step 1: Create Verification Record
                context -> {
                    logger.info("Saga [{}]: Step 1 - Creating verification record.", context.getSagaId());
                    String url = verificationServiceUrl + "/verifications";
                    HttpEntity<String> request = new HttpEntity<>("{}");
                    try {
                        ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
                        if (response.getStatusCode() == HttpStatus.CREATED && response.getBody() != null) {
                            String recordId = (String) response.getBody().get("id");
                            context.put("recordId", recordId);
                            return true;
                        }
                    } catch (Exception e) {
                        logger.error("Error calling verification service", e);
                    }
                    return false;
                },
                // Compensation for Step 1
                context -> {
                    String recordId = context.get("recordId");
                    if (recordId != null) {
                        logger.warn("Saga [{}]: Compensating - Deleting verification record {}.", context.getSagaId(), recordId);
                        String url = verificationServiceUrl + "/verifications/" + recordId;
                        restTemplate.delete(url);
                    }
                }
            )
            .addStep(
                // Step 2: Process Document with CV (long-running)
                // 这里的坑在于:这是一个长事务,同步调用会阻塞Saga执行器。
                // 在生产环境中,应该采用异步回调或轮询。为简化,这里我们用同步模拟,但设置一个较短的超时。
                context -> {
                    String recordId = context.get("recordId");
                    logger.info("Saga [{}]: Step 2 - Starting CV processing for record {}.", context.getSagaId(), recordId);
                    String url = cvProcessingServiceUrl + "/process";
                    // 实际项目中,这里会传递图片数据
                    HttpEntity<String> request = new HttpEntity<>("{\"recordId\": \"" + recordId + "\"}");
                    try {
                        ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
                         if (response.getStatusCode() == HttpStatus.OK && "SUCCESS".equals(response.getBody().get("status"))) {
                             context.put("cvResult", response.getBody().get("data"));
                             return true;
                         }
                    } catch (Exception e) {
                        logger.error("Error calling CV processing service", e);
                    }
                    return false;
                },
                // Compensation for Step 2
                // CV处理的补偿可能很复杂,比如删除中间产物。这里我们简化为打印日志。
                context -> {
                    String recordId = context.get("recordId");
                    logger.warn("Saga [{}]: Compensating - CV processing for record {} failed. No direct resource to delete.", context.getSagaId(), recordId);
                }
            )
            .addStep(
                // Step 3: Check Compliance
                context -> {
                    String recordId = context.get("recordId");
                    logger.info("Saga [{}]: Step 3 - Checking compliance for record {}.", context.getSagaId(), recordId);
                    String url = complianceServiceUrl + "/check";
                     HttpEntity<String> request = new HttpEntity<>("{\"recordId\": \"" + recordId + "\"}");
                    try {
                        ResponseEntity<Map> response = restTemplate.postForEntity(url, request, Map.class);
                        return response.getStatusCode() == HttpStatus.OK && "PASSED".equals(response.getBody().get("result"));
                    } catch (Exception e) {
                        logger.error("Error calling compliance service", e);
                    }
                    return false;
                },
                // Compensation for Step 3
                context -> {
                    String recordId = context.get("recordId");
                    logger.warn("Saga [{}]: Compensating - Compliance check for record {} failed. No compensation action needed.", context.getSagaId(), recordId);
                }
            );
    }
}

第三步:构建参与方服务

这些是简单的Spring Boot应用,每个都在不同的端口上运行。

Verification Service (port 8081)

@RestController
@RequestMapping("/verifications")
public class VerificationController {
    private final Map<String, String> records = new ConcurrentHashMap<>();

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Map<String, String> create() {
        String id = UUID.randomUUID().toString();
        records.put(id, "PENDING");
        System.out.println("Created verification record: " + id);
        return Map.of("id", id);
    }

    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public void delete(@PathVariable String id) {
        records.remove(id);
        System.out.println("Compensated: Deleted verification record: " + id);
    }
}

CV Processing Service (port 8082)
这个服务模拟一个耗时且可能失败的操作。

@RestController
public class CvController {

    @PostMapping("/process")
    public Map<String, Object> process() throws InterruptedException {
        System.out.println("CV Service: Received processing request...");
        // 模拟耗时操作
        Thread.sleep(2000); 

        // 模拟随机失败
        if (new Random().nextInt(10) < 3) { // 30%的失败率
            System.err.println("CV Service: Processing failed!");
            return Map.of("status", "FAILED");
        }
        
        System.out.println("CV Service: Processing successful.");
        return Map.of("status", "SUCCESS", "data", "Extracted text...");
    }
}

Compliance Service (port 8083)

@RestController
public class ComplianceController {

    @PostMapping("/check")
    public Map<String, String> check() {
        // 同样模拟失败的可能性
        if (new Random().nextBoolean()) {
            System.out.println("Compliance Service: Check PASSED");
            return Map.of("result", "PASSED");
        } else {
            System.err.println("Compliance Service: Check FAILED");
            return Map.of("result", "FAILED");
        }
    }
}

第四步:SwiftUI客户端实现

客户端负责上传文件并发起Saga流程。由于后端是长事务,客户端不应同步等待结果,而是采用一种异步获取状态的机制。

import SwiftUI

struct VerificationView: View {
    @State private var statusMessage = "Ready to upload document"
    @State private var isProcessing = false
    
    // 你的Orchestrator服务的URL
    let orchestratorURL = URL(string: "http://localhost:8080/verifications")!

    var body: some View {
        VStack(spacing: 20) {
            Text("Document Verification")
                .font(.largeTitle)
            
            Text(statusMessage)
                .font(.body)
                .foregroundColor(isProcessing ? .blue : .gray)
            
            Button(action: startVerificationProcess) {
                Text("Upload and Verify")
                    .padding()
                    .background(isProcessing ? Color.gray : Color.blue)
                    .foregroundColor(.white)
                    .cornerRadius(10)
            }
            .disabled(isProcessing)
        }
        .padding()
    }

    func startVerificationProcess() {
        self.isProcessing = true
        self.statusMessage = "Uploading document..."

        // 在真实应用中,你会从UIImagePickerController或文件中获取数据
        let dummyImageData = "dummy-image-data".data(using: .utf8)!

        var request = URLRequest(url: orchestratorURL)
        request.httpMethod = "POST"
        // multipart/form-data的实现较为复杂,这里为了演示核心流程,简化为普通POST
        // 生产级代码需要构建一个正确的multipart body
        request.setValue("application/octet-stream", forHTTPHeaderField: "Content-Type")
        request.httpBody = dummyImageData

        URLSession.shared.dataTask(with: request) { data, response, error in
            DispatchQueue.main.async {
                self.isProcessing = false
                if let error = error {
                    self.statusMessage = "Error: \(error.localizedDescription)"
                    return
                }
                
                guard let httpResponse = response as? HTTPURLResponse else {
                    self.statusMessage = "Invalid response from server"
                    return
                }

                // Saga是异步执行的,后端立即返回202 Accepted是更好的实践
                // 但在我们当前的同步Saga实现中,它会等待完成
                if (200...299).contains(httpResponse.statusCode) {
                    self.statusMessage = "Verification process completed successfully!"
                } else {
                    // Saga流程中某个步骤失败并已补偿
                    self.statusMessage = "Verification failed. Please try again. (Status code: \(httpResponse.statusCode))"
                }
            }
        }.resume()
    }
}

这里的单元测试思路:

  1. SagaExecutor单元测试: 构造一个包含多个步骤的SagaDefinition,模拟其中某个action抛出异常或返回false,断言compensate方法被正确调用,并且调用顺序是相反的。
  2. VerificationSaga集成测试: 使用MockRestServiceServer来模拟对其他服务的HTTP调用。测试当所有服务都返回成功时,Saga流程能完整执行。测试当CV服务返回失败时,创建记录服务的补偿接口被调用。

第五步:使用Docker Swarm部署

Docker Swarm提供了一种轻量级的容器编排方案。我们创建一个 docker-compose.yml 文件来定义我们的服务栈。

docker-compose.yml:

version: '3.8'

services:
  orchestrator:
    image: my-saga-orchestrator:latest # 你需要先构建这个镜像
    ports:
      - "8080:8080"
    networks:
      - saga-net
    deploy:
      replicas: 2
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure

  verification-service:
    image: my-verification-service:latest
    networks:
      - saga-net
    deploy:
      replicas: 1

  cv-processing-service:
    image: my-cv-processing-service:latest
    networks:
      - saga-net
    deploy:
      replicas: 1
      # 在真实场景中,CV服务可能是资源密集型的,可以约束它部署在特定节点
      # placement:
      #   constraints:
      #     - "node.labels.has_gpu == true"

  compliance-service:
    image: my-compliance-service:latest
    networks:
      - saga-net
    deploy:
      replicas: 1

networks:
  saga-net:
    driver: overlay

每个服务的Dockerfile都很标准:

FROM openjdk:17-slim
WORKDIR /app
COPY target/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]

构建和部署流程:

  1. 为每个Spring Boot应用构建JAR包: mvn clean package
  2. 为每个应用构建Docker镜像: docker build -t my-saga-orchestrator:latest . (在各自的模块目录下执行)
  3. 初始化Swarm集群: docker swarm init (在管理节点)
  4. 部署服务栈: docker stack deploy -c docker-compose.yml saga-stack

部署后,Swarm的内置DNS会让服务之间通过服务名(例如 http://verification-service:8081)直接通信,这正是我们在application.yml中配置的方式。

当前方案的局限性与优化路径

这个实现虽然展示了编排式Saga的核心思想,但在生产环境中还存在一些不足。
首先,Saga执行器是同步阻塞的,尤其是在等待长耗时的CV任务时。这会占用执行器线程,降低系统吞吐量。一个更健壮的实现应该将Saga状态持久化到数据库中,并在每一步执行后更新状态。对于异步任务,执行器可以释放线程,通过回调或轮询来驱动Saga进入下一个状态。

其次,服务间的通信完全依赖同步的REST调用。如果某个服务暂时不可用,会导致整个Saga立即失败并回滚。引入消息队列(如RabbitMQ或Kafka)进行服务间的通信,可以实现解耦和更高的韧性。Saga的步骤可以由消息来驱动,这更符合事件驱动架构的理念,也即所谓的“编舞式Saga”(Choreography-based Saga)。

最后,我们的补偿逻辑非常简单。在真实世界中,补偿操作本身也可能失败。因此,补偿操作需要被设计成幂等的,并且要有重试机制。如果补偿持续失败,必须有告警机制通知人工介入处理,以避免数据处于不一致的中间状态。Docker Swarm虽然简化了部署,但相比Kubernetes,其在可观测性、自动扩缩容和生态系统方面的功能相对有限,对于更复杂的微服务系统,可能需要考虑迁移到功能更全面的平台上。


  目录