项目里一个看板应用的数据刷新机制让我头疼了很久。它展示的是一些关键业务实体的状态,后端用的是一套标准的 Spring Boot + JPA 技术栈。最初的实现简单粗暴:前端每5秒轮询一次后端接口,拉取全量数据。数据量不大时相安无事,但随着业务增长,这个轮询请求的响应时间和数据库压力都变得不可忽视。这显然不是一个可扩展的方案。
我的第一反应是在应用层解决。当服务层方法修改了JPA实体并调用 repository.save()
后,立刻通过某种机制(比如Spring ApplicationEvent)发布一个变更事件,然后由一个专门的推送服务捕获这个事件,再通过WebSocket或SSE推给前端。这个方案看起来可行,但仔细一想,坑太多了。
- 事务耦合: 如果事件发布在事务提交之前,而事务最终回滚了,我就推送了一个“幽灵”更新。如果发布在事务提交之后,那我就需要在业务代码里引入
TransactionSynchronizationManager.registerSynchronization
这样的东西,这会让业务逻辑变得复杂且丑陋。 - 数据不一致: 推送的事件内容应该是什么?是整个实体对象吗?如果事务中多次修改同一个实体,我该推送哪个版本?
- 外部变更无法捕获: 这是最致命的。如果有人通过数据库客户端直接修改了数据,或者一个计划任务脚本跑了一批更新,我的应用层事件机制对此一无所知。数据不一致就产生了。
这个思路从根上就是错的。应用的内存状态不是事实的唯一来源(Source of Truth),数据库才是。因此,最可靠的变更来源应该是数据库本身。这让我把目光投向了变更数据捕获(Change Data Capture, CDC)。
技术选型决策:数据库日志驱动的实时管道
CDC的核心思想是,与其在应用层拦截变更,不如直接去“监听”数据库的事务日志(比如MySQL的binlog,PostgreSQL的WAL)。这是最精准、最全面、对业务代码侵入性最小的方案。Debezium是这个领域的佼佼者,它把自己伪装成一个数据库的从库,读取事务日志,并将行级别的INSERT
、UPDATE
、DELETE
操作解析成结构化的事件流,推送到消息队列(通常是Kafka)中。
最终我确定的架构如下:
graph TD subgraph "业务应用 (Spring Boot + JPA)" A[Controller/Service] -- save/update/delete --> B(JPA Repository) end B -- DML操作 --> C{MariaDB/MySQL}; C -- 写入Binlog --> D[Debezium Connector]; D -- 产生变更事件 --> E{Apache Kafka Topic}; subgraph "实时推送服务 (Spring Boot)" F[Kafka Consumer] -- 消费事件 --> G(SSE Bridge Service); G -- 管理连接 --> H(Active SSE Connections); end E --> F; I[前端浏览器] -- HTTP GET --> G; G -- SseEmitter.send() --> I;
这个方案的优势显而易见:
- 解耦: 业务应用只管操作数据库,完全不知道下游有CDC的存在。推送服务也只关心Kafka里的事件,不关心事件是谁产生的。
- 可靠性: 基于数据库日志,不会丢失任何变更,且能正确处理事务。
- 实时性: 延迟非常低,通常在毫秒级别。
- 可扩展性: Kafka作为中间的缓冲层,可以支撑海量的变更事件,并且可以有多个不同的消费方,而不仅仅是我们的SSE推送服务。
前端通信技术我选择了Server-Sent Events (SSE) 而不是WebSocket。对于这种服务器到客户端的单向数据流场景,SSE更轻量,它基于标准的HTTP协议,无需复杂的握手和协议升级,且支持自动重连,非常适合我们的看板刷新需求。
步骤化实现:搭建端到端的数据管道
我们从零开始,用Docker Compose搭建一套完整的本地开发环境。这套环境包含了所有必要的组件,确保方案的可复现性。
1. 环境准备: Docker Compose
这是我们的 docker-compose.yml
文件。它定义了数据库、Zookeeper、Kafka以及最重要的Kafka Connect(内置Debezium连接器)。
version: '3.8'
services:
mariadb:
image: mariadb:10.9
container_name: cdc-mariadb
restart: unless-stopped
ports:
- "3306:3306"
environment:
- MARIADB_ROOT_PASSWORD=root
- MARIADB_DATABASE=inventory
- MARIADB_USER=user
- MARIADB_PASSWORD=password
volumes:
- ./mariadb-conf:/etc/mysql/conf.d
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost", "-u", "root", "-proot"]
interval: 5s
timeout: 5s
retries: 20
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: cdc-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.2
container_name: cdc-kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-connect:
image: debezium/connect:2.1
container_name: cdc-kafka-connect
ports:
- "8083:8083"
depends_on:
- kafka
- mariadb
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
注意,MariaDB需要开启binlog。我们在 mariadb-conf/custom.cnf
中配置:
[mysqld]
log-bin=mysql-bin
binlog_format=ROW
binlog-do-db=inventory
启动所有服务: docker-compose up -d
。
2. JPA实体与业务应用
我们创建一个简单的Spring Boot应用作为数据源。首先是Product
实体:
// Product.java
import jakarta.persistence.*;
import java.math.BigDecimal;
@Entity
@Table(name = "products")
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String name;
private String description;
@Column(nullable = false)
private BigDecimal price;
// Getters and Setters
}
为了方便测试,我们创建一个简单的REST接口来增删改查Product
。这部分代码很常规,这里不再赘述。
3. 配置Debezium Connector
当所有Docker容器运行后,我们需要通过Kafka Connect的REST API来注册我们的MariaDB连接器。
创建一个 register-mariadb-connector.json
文件:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mariadb",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.products",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
"topic.prefix": "dbserver1",
"decimal.handling.mode": "double",
"snapshot.mode": "initial"
}
}
通过curl
命令注册它:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d @register-mariadb-connector.json
现在,任何对inventory.products
表的变更都会被Debezium捕获,并发送到名为 dbserver1.inventory.products
的Kafka Topic中。
4. Kafka-to-SSE桥接服务
这是我们的核心逻辑所在。创建一个新的Spring Boot应用,它将作为SSE推送服务。
a. SSE连接管理器
首先,我们需要一个线程安全的组件来管理所有活跃的SSE客户端连接。一个常见的错误是直接在Controller里用一个List
来存SseEmitter
,这在并发环境下会出问题。
// SseConnectionManager.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SseConnectionManager {
private static final Logger log = LoggerFactory.getLogger(SseConnectionManager.class);
private final Map<String, SseEmitter> connections = new ConcurrentHashMap<>();
public SseEmitter add(String clientId) {
// 设置一个较长的超时时间,或依赖心跳来维持连接
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitter.onCompletion(() -> {
log.info("Client {} connection completed.", clientId);
connections.remove(clientId);
});
emitter.onTimeout(() -> {
log.warn("Client {} connection timed out.", clientId);
emitter.complete();
connections.remove(clientId);
});
emitter.onError(throwable -> {
log.error("Error on client {} connection: {}", clientId, throwable.getMessage());
emitter.complete();
connections.remove(clientId);
});
connections.put(clientId, emitter);
log.info("Client {} connected. Total connections: {}", clientId, connections.size());
// 连接建立后,立即发送一个心跳或欢迎消息
sendPing(emitter, clientId);
return emitter;
}
public void broadcast(Object data) {
connections.forEach((clientId, emitter) -> {
try {
emitter.send(SseEmitter.event().data(data));
} catch (IOException e) {
log.warn("Failed to send message to client {}: {}. Removing connection.", clientId, e.getMessage());
emitter.complete();
connections.remove(clientId);
}
});
}
private void sendPing(SseEmitter emitter, String clientId) {
try {
// :ping 这样的注释行可以作为心跳,防止代理服务器切断空闲连接
emitter.send(SseEmitter.event().comment("ping"));
} catch (IOException e) {
log.warn("Failed to send ping to client {}: {}", clientId, e.getMessage());
}
}
}
这个管理器使用了ConcurrentHashMap
来保证线程安全,并为每个SseEmitter
设置了完成、超时和错误的回调,以确保能清理掉无效的连接。
b. SSE Controller
Controller负责接受客户端的连接请求。
// ChangeEventController.java
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
@RestController
public class ChangeEventController {
private final SseConnectionManager sseConnectionManager;
public ChangeEventController(SseConnectionManager sseConnectionManager) {
this.sseConnectionManager = sseConnectionManager;
}
@GetMapping(path = "/events/products", produces = "text/event-stream")
public SseEmitter streamProductChanges() {
String clientId = UUID.randomUUID().toString();
return sseConnectionManager.add(clientId);
}
}
c. 数据格式约定 (样式方案
)
Debezium产生的JSON事件结构非常复杂,包含了大量的元数据。直接把它扔给前端绝对是个灾难。我们需要定义一个清晰、简洁的数据传输对象(DTO),这就是所谓的“样式方案”——定义数据呈现的样式和结构。
// EntityChangeEvent.java
public class EntityChangeEvent<T> {
private String operation; // "CREATE", "UPDATE", "DELETE"
private String entityType;
private T before; // state before change (for UPDATE/DELETE)
private T after; // state after change (for CREATE/UPDATE)
// Constructors, Getters, Setters
}
// Operation.java
public enum Operation {
CREATE("c"),
UPDATE("u"),
DELETE("d"),
READ("r"); // Snapshot reads
public final String code;
Operation(String code) { this.code = code; }
public static Operation forCode(String code) {
for (Operation op : values()) {
if (op.code.equals(code)) {
return op;
}
}
throw new IllegalArgumentException("Unknown operation code: " + code);
}
}
d. Kafka 消费者与事件转换
这是将所有部分粘合起来的地方。我们监听Kafka Topic,解析Debezium事件,将其转换为我们定义的EntityChangeEvent
DTO,然后通过SseConnectionManager
广播出去。
// DebeziumEventConsumer.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class DebeziumEventConsumer {
private static final Logger log = LoggerFactory.getLogger(DebeziumEventConsumer.class);
private final SseConnectionManager sseConnectionManager;
private final ObjectMapper objectMapper;
public DebeziumEventConsumer(SseConnectionManager sseConnectionManager, ObjectMapper objectMapper) {
this.sseConnectionManager = sseConnectionManager;
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "dbserver1.inventory.products")
public void handleProductChanges(String payload) {
try {
JsonNode payloadNode = objectMapper.readTree(payload);
JsonNode schema = payloadNode.get("schema");
// 我们只关心数据变更事件,忽略掉schema等其他消息
if (schema == null) {
log.warn("Received message without schema, skipping: {}", payload);
return;
}
JsonNode payloadData = payloadNode.get("payload");
if (payloadData == null) {
log.debug("Received message without payload (e.g., heartbeat), skipping.");
return;
}
Operation operation = Operation.forCode(payloadData.get("op").asText());
JsonNode beforeNode = payloadData.get("before");
JsonNode afterNode = payloadData.get("after");
// 这里只处理Product实体,实际项目中可能需要一个工厂或策略模式来处理不同实体
Product before = nodeToProduct(beforeNode);
Product after = nodeToProduct(afterNode);
EntityChangeEvent<Product> event = new EntityChangeEvent<>();
event.setOperation(operation.name());
event.setEntityType("Product");
event.setBefore(before);
event.setAfter(after);
log.info("Broadcasting event: {}", objectMapper.writeValueAsString(event));
sseConnectionManager.broadcast(event);
} catch (JsonProcessingException e) {
log.error("Failed to parse Debezium event: {}", payload, e);
} catch (Exception e) {
log.error("An unexpected error occurred while processing event: {}", payload, e);
}
}
// 辅助方法,将Debezium事件中的JsonNode转换为我们的JPA实体
private Product nodeToProduct(JsonNode node) {
if (node == null || node.isNull()) {
return null;
}
try {
return objectMapper.treeToValue(node, Product.class);
} catch (JsonProcessingException e) {
log.error("Failed to convert JsonNode to Product: {}", node.toString(), e);
return null;
}
}
}
这段代码是整个管道的关键。它消费原始的Debezium事件,提取op
(操作类型)、before
(变更前状态)、after
(变更后状态),然后将这些数据映射到我们干净的Product
实体和EntityChangeEvent
DTO中,最后进行广播。
测试端到端流程
- 确保
docker-compose up
启动了所有服务,并且Debezium连接器已成功注册。 - 启动作为SSE推送服务的Spring Boot应用。
- 在一个终端窗口中,使用
curl
连接到SSE端点:
你会看到一个curl -N http://localhost:8080/events/products
:ping
心跳注释,然后连接会保持打开状态。 - 在另一个终端或数据库客户端中,操作
products
表:- 插入数据:
INSERT INTO products (name, description, price) VALUES ('Laptop', 'A powerful laptop', 1200.00);
- 更新数据:
UPDATE products SET price = 1150.00 WHERE name = 'Laptop';
- 删除数据:
DELETE FROM products WHERE name = 'Laptop';
- 插入数据:
每执行一条SQL,你都会在curl
的终端中看到一个data:
行,后面跟着一个JSON对象,精确地描述了刚刚发生的变更。例如,更新操作的输出会是这样:
data:{"operation":"UPDATE","entityType":"Product","before":{"id":1,"name":"Laptop","description":"A powerful laptop","price":1200.00},"after":{"id":1,"name":"Laptop","description":"A powerful laptop","price":1150.00}}
这证明了我们的数据管道已经成功打通。
方案的局限性与未来迭代路径
这个方案虽然优雅且强大,但在生产环境中直接部署前,还需要考虑几个问题。
首先,SseConnectionManager
是单点状态。如果推送服务部署多个实例,一个客户端连接到实例A,而Kafka消息被实例B消费,那么这个客户端就收不到消息。要解决这个问题,需要将SSE连接状态外部化,或者在实例间进行消息广播。一种常见的做法是,每个实例依然消费Kafka消息,但不再直接广播给本地连接,而是将消息发布到Redis Pub/Sub的某个Channel上。所有实例都订阅这个Channel,收到消息后再广播给各自管理的本地SSE连接。这引入了额外的组件,但换来了水平扩展的能力。
其次,SSE本身不保证消息送达。如果客户端在两次推送之间断线重连,它会丢失中间的所有消息。对于某些要求严格的场景,可能需要在每个事件上附加一个唯一的、递增的ID(比如Kafka的offset),客户端记录下收到的最后一个ID。重连时,在HTTP头中带上这个ID (Last-Event-ID
),服务端可以据此从Kafka的某个位置重新消费,为这个客户端补发错过的消息。这大大增加了服务端的实现复杂度。
最后,当前的广播机制是一视同仁的,所有客户端收到所有变更。在复杂的应用中,客户端可能只关心某些特定实体的变更(例如,只关心ID为123
的产品的价格变化)。这就需要在订阅时提供过滤参数,服务端维护更复杂的订阅关系,并在广播时进行匹配。这会让SseConnectionManager
的逻辑变得复杂得多。