基于 C++ Raft 实现与 ActiveMQ 的分布式数据湖元数据管理日志


我们数据湖平台的元数据管理一直依赖于一个单点的、基于关系型数据库的 Metastore。随着业务规模的扩大,这个架构的瓶颈愈发明显:并发的 DDL 操作(例如,ETL 任务同时创建数百个分区)会造成严重的锁竞争,数据库的单点故障风险也成了悬在我们头顶的达摩克利斯之-剑。任何一次 Metastore 的抖动,都会直接影响到上游的数据写入和下游的查询分析引擎。我们需要一个高可用、高并发、强一致性的元数据状态管理服务。

技术痛点与初步构想

核心问题在于状态管理。数据湖的元数据本质上就是一个状态集合,包含库、表、分区、Schema 等信息。对元数据的任何修改,都是一次状态转移。在分布式环境中,要保证这个状态的一致性和高可用,最经典的模型就是**复制状态机 (Replicated State Machine)**。

我们的构想是构建一个独立的元数据管理集群。集群由多个节点组成,每个节点都维护一份完整的元数据内存快照。客户端的写请求(如 CREATE TABLE)被视为一个指令,这个指令通过共识协议被提交到所有节点的一个共享日志中。一旦指令被“提交”,每个节点就按顺序将日志中的指令应用到自己的内存状态机上,从而保证所有节点的状态最终一致。

技术选型决策

  • 为什么不用 ZooKeeper 或 etcd?
    这是最先被提出的方案。它们都是成熟的共识组件。但在我们的场景下,存在几个顾虑:首先,性能。我们的元数据查询 QPS 极高,任何一次 RPC 的延迟都会被放大。将核心组件寄托于一个外部系统,其性能黑盒和运维复杂性是我们不愿接受的。其次,技术栈与定制化。我们的核心团队是 C++ 背景,需要对系统有极致的掌控力,包括内存管理、线程模型和网络IO。从零开始构建一个专用的 Raft 实现,虽然初期投入大,但长期来看,可以与我们的业务逻辑深度耦合,获得最佳性能和可控性。

  • 为什么选择 Raft 协议?
    在 Paxos 和 Raft 之间,我们选择了 Raft。原因无他,Raft 协议的设计目标之一就是可理解性 (Understandability)。它将共识问题分解为领导者选举、日志复制和安全性三个相对独立的子问题,这对于工程实现和后期维护至关重要。

  • 为什么是 C++?
    性能。元数据服务是整个数据平台的咽喉,必须拥有纳秒级的内存访问和微秒级的网络响应。C++ 赋予我们直接操控内存布局、使用高效数据结构(如 std::unordered_map)、以及利用 epollio_uring 等底层 IO 模型的能力,这是实现极致性能的基础。

  • ActiveMQ 的角色是什么?
    这是一个架构解耦的关键决策。当元数据状态发生变更(例如一个新分区被添加),下游系统(如 Presto、Spark SQL、数据质量监控服务)如何感知?一种方式是它们定期轮询元数据服务,但这会带来延迟和大量的无效请求。更好的方式是事件驱动。当我们的 Raft 集群提交并应用一条日志后,领导者节点会立刻将这条变更作为一个事件发布到 ActiveMQ 的一个 Topic 中。所有关心元数据变化的下游系统,只需订阅这个 Topic 即可获得近乎实时的通知。ActiveMQ 在这里充当了一个可靠的、异步的变更数据捕获 (CDC) 管道。

架构与流程

整体架构分为几个核心部分:Raft 共识模块、内存状态机、持久化模块以及变更发布模块。

sequenceDiagram
    participant Client
    participant RaftNode_Leader as Leader
    participant RaftNode_Follower as Follower
    participant StateMachine as In-Memory State Machine
    participant ActiveMQ

    Client->>+RaftNode_Leader: Propose Command (e.g., ADD_PARTITION)
    RaftNode_Leader->>RaftNode_Leader: Append to its own log
    RaftNode_Leader->>+RaftNode_Follower: AppendEntries RPC (with command)
    RaftNode_Follower-->>-RaftNode_Leader: Acknowledge Success
    RaftNode_Leader->>RaftNode_Leader: Majority reached, Commit Entry
    RaftNode_Leader->>+StateMachine: Apply(Command)
    StateMachine->>StateMachine: Update internal state (e.g., add partition to map)
    StateMachine-->>-RaftNode_Leader: Apply Success
    RaftNode_Leader->>+ActiveMQ: Publish Change Event
    ActiveMQ-->>-RaftNode_Leader: Publish Ack
    RaftNode_Leader-->>-Client: Command Success
    
    Note right of ActiveMQ: Downstream consumers (Spark, Presto)
receive the event asynchronously.

步骤化实现:核心代码与陷阱

1. Raft 核心逻辑 (C++)

我们首先要定义 Raft 的核心数据结构。一个常见的错误是过早地考虑网络序列化,导致数据结构设计臃肿。在真实项目中,应先专注于逻辑本身。

#include <iostream>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <unordered_map>

// Raft 日志条目
struct LogEntry {
    long long term;
    std::string command; // 序列化后的状态机指令
};

// 节点状态
enum class NodeState {
    Follower,
    Candidate,
    Leader
};

class RaftNode {
public:
    RaftNode(int id, int peer_count);
    void start();
    
private:
    // Raft 核心状态 (持久化)
    std::atomic<long long> current_term_{0};
    std::atomic<int> voted_for_{-1};
    std::vector<LogEntry> log_;
    std::mutex log_mutex_;

    // 易失性状态
    std::atomic<long long> commit_index_{0};
    std::atomic<long long> last_applied_{0};
    NodeState state_{NodeState::Follower};

    // Leader 特有状态
    std::vector<long long> next_index_;
    std::vector<long long> match_index_;

    // 节点信息
    const int node_id_;
    const int peer_count_;
    const int majority_;

    // 定时器
    std::atomic<bool> stop_flag_{false};
    std::chrono::milliseconds election_timeout_;
    std::chrono::steady_clock::time_point last_heartbeat_time_;
    std::thread election_thread_;

    void election_loop();
    void reset_election_timer();
    void become_candidate();
    void become_leader();
    void send_heartbeats();

    // 模拟 RPC 存根
    struct RequestVoteArgs { long long term; int candidate_id; long long last_log_index; long long last_log_term; };
    struct RequestVoteReply { long long term; bool vote_granted; };
    RequestVoteReply handle_request_vote(const RequestVoteArgs& args);
    
    struct AppendEntriesArgs { /* ... */ };
    struct AppendEntriesReply { /* ... */ };
    AppendEntriesReply handle_append_entries(const AppendEntriesArgs& args);
};

// 构造函数:初始化节点信息和随机选举超时
RaftNode::RaftNode(int id, int peer_count)
    : node_id_(id), peer_count_(peer_count), majority_((peer_count + 1) / 2 + 1) {
    // 生产环境中,这个超时时间需要仔细调优,通常在 150-300ms 之间
    reset_election_timer();
    std::cout << "Node " << node_id_ << " created. Majority is " << majority_ << std::endl;
}

void RaftNode::start() {
    last_heartbeat_time_ = std::chrono::steady_clock::now();
    election_thread_ = std::thread(&RaftNode::election_loop, this);
}

void RaftNode::reset_election_timer() {
    // 增加随机性以防止选举冲突
    auto random_millis = std::rand() % 150 + 150; // 150-300ms
    election_timeout_ = std::chrono::milliseconds(random_millis);
    last_heartbeat_time_ = std::chrono::steady_clock::now();
}

// 选举循环是 Raft 的脉搏
void RaftNode::election_loop() {
    while (!stop_flag_) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        
        if (state_ == NodeState::Leader) {
            // Leader 不需要选举超时,而是发送心跳
            // 这里为了简化,我们假设心跳逻辑在别处触发
            continue;
        }

        auto now = std::chrono::steady_clock::now();
        if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_heartbeat_time_) > election_timeout_) {
            std::cout << "Node " << node_id_ << " election timeout, becoming candidate." << std::endl;
            become_candidate();
        }
    }
}

// 成为候选人并发起选举
void RaftNode::become_candidate() {
    state_ = NodeState::Candidate;
    current_term_++;
    voted_for_ = node_id_;
    reset_election_timer();

    int votes_received = 1; // Vote for self

    // 向所有其他节点发送 RequestVote RPC
    // 这里的坑在于:必须并行发送请求,串行发送会导致选举过程过长
    // 在真实项目中,这里会用一个线程池来分发 RPC 调用
    for (int i = 0; i < peer_count_ + 1; ++i) {
        if (i == node_id_) continue;

        // --- 模拟 RPC 调用 ---
        long long last_log_idx, last_log_term_val;
        {
            std::lock_guard<std::mutex> lock(log_mutex_);
            last_log_idx = log_.size() - 1;
            last_log_term_val = (last_log_idx >= 0) ? log_[last_log_idx].term : 0;
        }
        
        RequestVoteArgs args = {current_term_.load(), node_id_, last_log_idx, last_log_term_val};
        // other_node[i]->handle_request_vote(args);
        // 假设 RPC 调用是同步阻塞的
        RequestVoteReply reply = {0, false}; // 模拟收到回复
        // --- 模拟结束 ---

        if (reply.term > current_term_) {
            // 发现有更高任期的节点,立刻变回 Follower
            current_term_ = reply.term;
            state_ = NodeState::Follower;
            voted_for_ = -1;
            return;
        }

        if (reply.vote_granted) {
            votes_received++;
        }
    }

    if (votes_received >= majority_) {
        std::cout << "Node " << node_id_ << " received majority votes and becomes leader for term " << current_term_ << std::endl;
        become_leader();
    }
}

// Raft 协议的安全性核心:投票约束
RaftNode::RequestVoteReply RaftNode::handle_request_vote(const RequestVoteArgs& args) {
    RequestVoteReply reply;
    long long current_term_val = current_term_.load();

    if (args.term < current_term_val) {
        reply.term = current_term_val;
        reply.vote_granted = false;
        return reply;
    }

    if (args.term > current_term_val) {
        // 发现更高任期,无论如何都更新自己的任期并转为 Follower
        current_term_ = args.term;
        state_ = NodeState::Follower;
        voted_for_ = -1;
    }

    reply.term = current_term_.load();
    bool already_voted = voted_for_ != -1 && voted_for_ != args.candidate_id;
    
    if (already_voted) {
        reply.vote_granted = false;
        return reply;
    }

    // 核心安全约束:候选人的日志必须至少和自己一样“新”
    long long my_last_log_index, my_last_log_term;
    {
        std::lock_guard<std::mutex> lock(log_mutex_);
        my_last_log_index = log_.size() > 0 ? log_.size() - 1 : -1;
        my_last_log_term = my_last_log_index != -1 ? log_[my_last_log_index].term : 0;
    }

    bool log_is_ok = (args.last_log_term > my_last_log_term) || 
                     (args.last_log_term == my_last_log_term && args.last_log_index >= my_last_log_index);

    if (log_is_ok) {
        voted_for_ = args.candidate_id;
        reply.vote_granted = true;
        reset_election_timer(); // 投票给别人也算是收到了“心跳”,重置定时器
    } else {
        reply.vote_granted = false;
    }
    
    return reply;
}

// 其他方法...
void RaftNode::become_leader() { state_ = NodeState::Leader; /* ... */ }
void RaftNode::send_heartbeats() { /* ... */ }

实现陷阱与思考:

  1. 并发与锁: current_term_voted_for_ 等状态会被多个线程(RPC处理线程、选举线程)访问,必须使用 std::atomicstd::mutex 保护。日志 log_ 的读写更是需要细粒度的锁控制。一个常见的错误是使用一个全局大锁,这会严重影响性能。
  2. 定时器: Raft 的活性(liveness)严重依赖于定时器。定时器的实现必须高效且准确。在 C++ 中,使用 std::condition_variable 配合 wait_for 是比 sleep 更好的选择,因为它可以被提前唤醒。
  3. RPC: 上述代码中 RPC 是伪代码。在实际项目中,我们会基于 Boost.Asio 构建一个异步TCP通信层,并自定义简单的二进制协议(如 [4字节长度][1字节类型][protobuf/flatbuffer负载]),以避免 HTTP 或 gRPC 带来的开销。

2. 元数据状态机与持久化

状态机是 Raft 的应用层。它接收已提交的日志条目并将其应用到业务状态上。

#include <map>
#include <string>
#include <vector>
#include <fstream>
#include <mutex>

// 简化版的表结构定义
struct TableSchema {
    std::string name;
    std::vector<std::string> columns;
    // ... 其他元信息
};

// 状态机接口
class MetaStateMachine {
public:
    // 应用指令,这是状态机的唯一入口
    void apply(const std::string& command);

    // 查询接口
    std::optional<TableSchema> get_table(const std::string& table_name);

    // 持久化与恢复
    void take_snapshot(const std::string& path);
    void restore_from_snapshot(const std::string& path);

private:
    std::map<std::string, TableSchema> tables_;
    std::mutex state_mutex_;
};

void MetaStateMachine::apply(const std::string& command) {
    // 这里的 command 是从 Raft log entry 中获取的序列化字符串
    // 在生产环境中,我们会使用 Protobuf 或类似的格式
    // 示例: "CREATE_TABLE;my_table;id,name,value"
    std::lock_guard<std::mutex> lock(state_mutex_);
    
    // 解析指令
    // ... 解析逻辑 ...
    std::string op = "CREATE_TABLE"; // 假设解析结果
    std::string table_name = "my_table";
    
    if (op == "CREATE_TABLE") {
        if (tables_.find(table_name) == tables_.end()) {
            TableSchema new_table;
            new_table.name = table_name;
            // ... 解析 columns ...
            tables_[table_name] = new_table;
            std::cout << "[StateMachine] Applied CREATE_TABLE for " << table_name << std::endl;
        }
    } else if (op == "ADD_PARTITION") {
        // ... 其他操作 ...
    }
    // 错误处理:如果指令无效,我们应该记录日志,但不能抛出异常中断状态机
}

// 快照是防止 Raft 日志无限增长的关键
void MetaStateMachine::take_snapshot(const std::string& path) {
    std::lock_guard<std::mutex> lock(state_mutex_);
    std::ofstream ofs(path, std::ios::binary);
    if (!ofs) {
        // 关键错误,必须记录日志并告警
        std::cerr << "Failed to open snapshot file for writing: " << path << std::endl;
        return;
    }
    // 序列化整个 `tables_` map
    // 生产级代码会用 Protobuf, 这里用一个简单的文本格式代替
    for (const auto& [name, schema] : tables_) {
        ofs << name << ";" << schema.columns.size() << ";"; // 伪序列化
        for(const auto& col : schema.columns) {
            ofs << col << ",";
        }
        ofs << "\n";
    }
    std::cout << "[StateMachine] Snapshot taken to " << path << std::endl;
}

void MetaStateMachine::restore_from_snapshot(const std::string& path) {
    std::lock_guard<std::mutex> lock(state_mutex_);
    std::ifstream ifs(path, std::ios::binary);
    if (!ifs) {
        std::cerr << "Failed to open snapshot file for reading: " << path << std::endl;
        return;
    }
    // 清空当前状态,从快照恢复
    tables_.clear();
    // ... 反序列化逻辑 ...
    std::cout << "[StateMachine] Restored from snapshot " << path << std::endl;
}

实现陷阱与思考:

  1. 确定性: 状态机的 apply 函数必须是完全确定性的。给定相同的初始状态和相同的指令序列,它必须产生完全相同的最终状态。这意味着函数内部不能依赖任何不确定的输入,如随机数、时间戳等。
  2. 快照与日志压缩: 当日志增长到一定大小时,领导者需要创建状态机的快照,并丢弃快照点之前的所有日志。这是一个复杂但必要的过程,涉及到与 Raft 日志模块的协调。
  3. 线程安全: 查询操作(get_table)也需要通过锁来保护,以防止在状态更新过程中读取到不一致的数据。对于读多写少的场景,可以使用读写锁 (std::shared_mutex) 来优化性能。

3. 集成 ActiveMQ 进行变更发布

这是将我们的共识核心与外部世界连接起来的桥梁。我们使用 ActiveMQ-CPP 客户端库。

#include <activemq/library/ActiveMQCPP.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/Topic.h>
#include <cms/MessageProducer.h>
#include <memory>

class ChangePublisher {
public:
    ChangePublisher(const std::string& broker_uri, const std::string& topic_name) {
        try {
            activemq::library::ActiveMQCPP::initializeLibrary();
            auto connectionFactory = std::make_unique<activemq::core::ActiveMQConnectionFactory>(broker_uri);
            
            connection_ = connectionFactory->createConnection();
            connection_->start();
            
            // 使用事务性会话或客户端确认可以提高可靠性,但会牺牲一些性能
            session_ = connection_->createSession(cms::Session::AUTO_ACKNOWLEDGE);
            
            topic_ = session_->createTopic(topic_name);
            producer_ = session_->createProducer(topic_);
            producer_->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // 元数据变更通知,允许丢失
            
            std::cout << "[Publisher] Connected to ActiveMQ at " << broker_uri << std::endl;
        } catch (cms::CMSException& e) {
            // 在生产环境中,连接失败必须有重试逻辑
            std::cerr << "CMSException caught: " << e.getMessage() << std::endl;
            // throw;
        }
    }

    ~ChangePublisher() {
        try {
            producer_->close();
            session_->close();
            connection_->close();
            activemq::library::ActiveMQCPP::shutdownLibrary();
        } catch (cms::CMSException& e) {
            // ignore
        }
    }

    void publish(const std::string& message) {
        if (!producer_) return;
        try {
            auto textMessage = std::unique_ptr<cms::TextMessage>(session_->createTextMessage(message));
            producer_->send(textMessage.get());
        } catch (cms::CMSException& e) {
            // 发送失败处理,例如记录日志,触发告警
            std::cerr << "Failed to publish message: " << e.getMessage() << std::endl;
        }
    }

private:
    std::unique_ptr<cms::Connection> connection_;
    std::unique_ptr<cms::Session> session_;
    std::unique_ptr<cms::Topic> topic_;
    std::unique_ptr<cms::MessageProducer> producer_;
};


// --- 在 Raft 节点中的集成 ---
// class RaftNode {
// ...
// private:
//     MetaStateMachine state_machine_;
//     std::unique_ptr<ChangePublisher> publisher_;
//
// public:
//     RaftNode(...) {
//         if (/* is leader */) { // 发布器仅由 leader 初始化
//             publisher_ = std::make_unique<ChangePublisher>("tcp://127.0.0.1:61616", "DATALAKE.METADATA.CHANGES");
//         }
//     }
//
//     void apply_committed_entries() {
//         while(commit_index_ > last_applied_) {
//             last_applied_++;
//             const auto& entry = log_[last_applied_];
//             state_machine_.apply(entry.command);
//             
//             // 如果是 Leader,就发布变更
//             if (state_ == NodeState::Leader && publisher_) {
//                 // 这里的序列化格式应与下游消费者约定好,JSON 是个不错的选择
//                 std::string event_message = "{\"term\":" + std::to_string(entry.term) + ", \"command\":\"" + entry.command + "\"}";
//                 publisher_->publish(event_message);
//             }
//         }
//     }
// };

实现陷阱与思考:

  1. 发布时机: 必须在指令被成功应用到状态机之后再发布。如果在应用前发布,可能会出现消息已发出但本节点宕机、状态未能更新的局面。
  2. Leader Only: 只有 Leader 节点可以发布消息。当 Leader 身份转移时,新的 Leader 需要创建自己的 ChangePublisher 实例,旧的 Leader 必须停止发布。
  3. 可靠性权衡: DeliveryMode 设置为 NON_PERSISTENT 意味着消息不会在 ActiveMQ Broker 端持久化,性能最高,但 Broker 重启会丢消息。对于元数据变更这种场景,如果下游系统有自己的状态核对机制,短暂的消息丢失通常是可以接受的。如果要求绝对可靠,则需要使用持久化消息和事务性会日志。

局限性与未来迭代

这个从零构建的方案给了我们极大的性能和控制力,但也并非没有缺点。当前的实现有几个明确的局限性:

  1. 集群成员变更: 我们没有实现 Raft 协议中关于动态增加或删除节点的逻辑。这是一个复杂但对生产环境至关重要的功能,需要通过 AddServerRemoveServer 这样的特殊日志条目来安全地改变集群配置。
  2. 客户端交互: 我们没有讨论客户端如何与 Raft 集群交互。一个完整的方案需要一个客户端库,它能自动发现 Leader,并在请求失败或收到重定向指令时,自动重试到正确的 Leader 节点。
  3. 状态机性能: 当前状态机是单线程应用的,所有写操作都是串行的。对于元数据这种场景,这通常足够了。但如果指令之间没有依赖关系(例如,对不同表的操作),理论上可以通过对状态进行分片(Sharding),实现并行应用,但这会极大地增加状态机逻辑的复杂度。
  4. 通信层健壮性: 我们自研的 RPC 层需要处理各种网络异常,如连接断开、超时、消息乱序或重复。一个生产级的网络库需要考虑完整的错误处理和重连机制。

下一步的迭代方向很明确:实现动态成员变更,开发一个智能客户端,并对状态机的持久化和快照机制进行更深入的性能优化。


  目录