任何一个SaaS产品演进到一定阶段,都会不可避免地撞上多租户这堵墙。我们团队遇到的挑战则更为棘手:产品需要支持地理分布式部署,以满足不同国家的数据主权法规(如GDPR),同时又要保证严格的数据隔离。早期的设计简单粗暴,在每个业务表中都加入了tenant_id
字段,然后在应用层逻辑中手动拼接WHERE
子句。这种方式在敏捷开发初期跑得很快,但随着业务逻辑的膨胀,它变成了一场灾难。忘记添加tenant_id
过滤条件的查询偶有发生,每一次都意味着一次潜在的数据泄露事故,这在生产环境中是不可接受的。
我们需要一个从架构层面解决问题的方案:一个能对应用开发者近乎透明,但在底层却坚不可摧的数据隔离层。这次复盘,就是记录我们如何一步步,通过几个敏捷迭代,利用OAuth 2.0、CockroachDB的行级安全(RLS)和OpenSearch的查询重写,构建出这套隔离体系的过程。
迭代零:确立原则与技术选型
在动手之前,我们首先定义了核心目标:
- 身份驱动隔离: 数据的访问权限必须与用户的身份绑定,而不是依赖于应用层代码的自觉性。
- 多存储系统一致性: 隔离策略必须同时在主事务数据库(CockroachDB)和搜索引擎(OpenSearch)中生效。
- 对开发者透明: 业务代码开发者应该专注于业务逻辑,而不是在每个数据访问点都关心租户隔离的实现细节。
- 地理亲和性: 数据应能根据租户的地理位置被固定在特定的区域。
基于这些原则,技术选型变得清晰起来:
- 敏捷开发: 这个问题太复杂,不可能一次性完美解决。我们决定采用迭代的方式,先构建核心骨架,再逐步增强其安全性与功能。
- OAuth 2.0 & JWT: 这是现代身份认证的事实标准。JWT的自定义声明(Custom Claims)是传递租户上下文(如
tenant_id
,user_role
,data_region
)的完美载体。 - CockroachDB: 它的两个特性击中了我们的痛点。首先是原生的地理分区能力,可以将数据行固定在特定地理位置的节点上。其次是它实现了PostgreSQL兼容的行级安全策略(Row-Level Security, RLS),这正是实现数据库层面透明隔离的关键。
- OpenSearch: 作为我们的搜索引擎,它同样需要实现租户隔离。虽然它没有像SQL数据库那样的RLS,但其强大的查询DSL和安全插件为我们实现隔离提供了通路。
迭代一:身份先行,构建带有租户上下文的请求管道
一切的起点是身份认证。当用户登录时,认证服务器必须在其签发的JWT中嵌入租户信息。一个典型的JWT Payload可能长这样:
{
"sub": "user-uuid-12345",
"exp": 1678886400,
"iat": 1678882800,
"iss": "https://auth.our-saas.com",
"aud": "our-saas-api",
"tid": "tenant-abc-xyz",
"region": "eu-central-1",
"roles": ["admin", "editor"]
}
这里的tid
和region
就是我们植入的自定义声明。
接下来,我们需要在API网关或服务自身的中间件层来解析这个JWT,并将租户上下文注入到请求的生命周期中。在Go中,我们利用context.Context
来实现这一点。
// pkg/auth/middleware.go
package auth
import (
"context"
"net/http"
"strings"
"github.com/golang-jwt/jwt/v4"
"github.com/sirupsen/logrus"
)
// TenantContextKey is a custom type to avoid context key collisions.
type TenantContextKey string
const (
// KeyTenantID holds the tenant ID in the context.
KeyTenantID TenantContextKey = "tenant_id"
// KeyRegion holds the tenant's data region in the context.
KeyRegion TenantContextKey = "region"
)
// Claims represents the custom JWT claims for our application.
type Claims struct {
TenantID string `json:"tid"`
Region string `json:"region"`
Roles []string `json:"roles"`
jwt.RegisteredClaims
}
// JWTMiddleware validates the JWT and injects tenant info into the context.
func JWTMiddleware(jwtSecret []byte) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header required", http.StatusUnauthorized)
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
if tokenString == authHeader { // No "Bearer " prefix
http.Error(w, "Invalid token format", http.StatusUnauthorized)
return
}
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
// Basic validation: ensure signing algorithm is what we expect.
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
logrus.WithField("alg", token.Header["alg"]).Warn("Unexpected signing method")
return nil, http.ErrAbortHandler
}
return jwtSecret, nil
})
if err != nil {
http.Error(w, "Invalid token: "+err.Error(), http.StatusUnauthorized)
return
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
if claims.TenantID == "" {
http.Error(w, "Token is missing tenant ID", http.StatusForbidden)
return
}
// Inject tenant info into the request context.
ctx := context.WithValue(r.Context(), KeyTenantID, claims.TenantID)
ctx = context.WithValue(ctx, KeyRegion, claims.Region)
// Here's a real-world logging practice: add context to all subsequent logs.
reqLogger := logrus.WithFields(logrus.Fields{
"tenant_id": claims.TenantID,
"user_id": claims.Subject,
})
// It's tricky to pass a logger through context, but this illustrates the goal.
// A better way is to pull info from context within the logger itself.
next.ServeHTTP(w, r.WithContext(ctx))
} else {
http.Error(w, "Invalid token claims", http.StatusUnauthorized)
}
})
}
}
这个中间件是整个体系的入口。它确保了任何一个通过认证的请求,其context
中都必然携带着tenant_id
。这是一个关键的约定,下游所有的数据访问逻辑都将依赖于此。这个迭代的产出看似简单,但它建立了一个贯穿整个调用链的、可靠的上下文传递机制。
迭代二:在CockroachDB中强制隔离
这是最核心的一步。我们的目标是,当业务代码执行SELECT * FROM invoices WHERE amount > 1000
时,数据库能自动将其转换为SELECT * FROM invoices WHERE amount > 1000 AND tenant_id = 'current_tenant_id'
。
首先,在CockroachDB中定义表结构和RLS策略。
-- 数据库表结构,包含租户ID和区域信息
-- The crdb_region column is special in CockroachDB for geo-partitioning
CREATE TABLE invoices (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id STRING NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
crdb_region crdb_internal_region NOT NULL
);
-- 创建索引,tenant_id是绝大多数查询的入口,必须高效
CREATE INDEX ON invoices (tenant_id, created_at);
-- 启用行级安全
ALTER TABLE invoices ENABLE ROW LEVEL SECURITY;
-- 定义一个会话变量来存储当前租户ID
-- CockroachDB/Postgres doesn't have globally defined variables, so we use a rarely used config parameter.
-- The 'app.' prefix is a convention to avoid conflicts.
-- We must make sure this is set per-transaction.
-- 创建RLS策略
-- 这个策略规定,对于SELECT, UPDATE, DELETE操作,只有当行的tenant_id
-- 与当前会话变量'app.tenant_id'匹配时,该行才可见或可操作。
CREATE POLICY tenant_isolation_policy ON invoices
FOR ALL -- Applies to SELECT, INSERT, UPDATE, DELETE
USING (tenant_id = current_setting('app.tenant_id'));
这里的魔法是current_setting('app.tenant_id')
。它读取一个特定于当前数据库会话的变量。这意味着,在执行任何查询之前,我们必须先为当前会сил话设置这个变量。
接下来是Go的数据访问层代码。这里的坑在于,数据库连接池会复用连接。如果在一个被污染(设置了其他租户ID)的连接上执行操作,就会导致数据错乱。因此,app.tenant_id
的设置必须与事务或单个请求的生命周期严格绑定。
// pkg/storage/invoices.go
package storage
import (
"context"
"database/sql"
"fmt"
"github.com/you/your-project/pkg/auth" // our context key package
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
)
// InvoiceRepository handles DB operations for invoices.
type InvoiceRepository struct {
pool *pgxpool.Pool
}
func NewInvoiceRepository(dbPool *pgxpool.Pool) *InvoiceRepository {
return &InvoiceRepository{pool: dbPool}
}
// GetInvoicesForCurrentUser fetches invoices for the tenant in the context.
// Notice the business logic here has NO knowledge of tenant_id filtering.
func (r *InvoiceRepository) GetInvoicesForCurrentUser(ctx context.Context) ([]string, error) {
// 1. Extract tenant_id from context. This is non-negotiable.
tenantID, ok := ctx.Value(auth.KeyTenantID).(string)
if !ok || tenantID == "" {
logrus.Error("Tenant ID not found in context. Denying data access.")
return nil, fmt.Errorf("access denied: missing tenant context")
}
conn, err := r.pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("failed to acquire connection: %w", err)
}
defer conn.Release()
// 2. THIS IS THE CRITICAL STEP. Set the session variable for the acquired connection.
// Using 'LOCAL' ensures the setting is only for the current transaction.
// This prevents connection pool pollution.
setVarQuery := "SET LOCAL app.tenant_id = $1"
if _, err := conn.Exec(ctx, setVarQuery, tenantID); err != nil {
return nil, fmt.Errorf("failed to set tenant context for db session: %w", err)
}
// 3. Now, execute the business query. It's clean and unaware of multi-tenancy.
query := "SELECT id FROM invoices ORDER BY created_at DESC LIMIT 10"
rows, err := conn.Query(ctx, query)
if err != nil {
// Proper error logging in production is vital.
logrus.WithFields(logrus.Fields{
"tenant_id": tenantID,
"error": err,
}).Error("Failed to execute invoice query")
return nil, fmt.Errorf("database query failed: %w", err)
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, nil
}
通过这种方式,我们把安全策略下沉到了数据库层面。应用代码变得干净,并且从根本上杜绝了因忘记WHERE tenant_id
而导致的数据泄露。单元测试这个逻辑也变得直接:提供一个带有tenant_id
的context,断言能查到数据;提供一个空的context,断言返回错误;提供一个错误的tenant_id
,断言查询结果为空。
迭代三:将隔离策略同步到OpenSearch
事务数据隔离后,搜索数据成了新的短板。用户在搜索框里输入内容,我们不希望A租户的员工搜到B租户的机密发票。
OpenSearch没有与CockroachDB RLS直接等价的功能,但我们可以通过组合其现有能力来模拟。我们放弃了“一个租户一个索引”的方案,因为它会导致“索引地狱”,管理和资源成本极高。我们选择共享索引的策略,即所有租户的发票数据都存在一个名为invoices
的索引中,但每个文档都必须包含tenant_id
字段。
隔离的关键在于,在执行搜索前,对用户的查询DSL(Domain Specific Language)进行“包装”,强制注入一个filter
子句。
// 用户原始的查询DSL
{
"query": {
"match": {
"description": "urgent project"
}
}
}
// 经过我们的数据访问层包装后的查询DSL
{
"query": {
"bool": {
"must": [
{
"match": {
"description": "urgent project"
}
}
],
"filter": [
{
"term": {
"tenant_id": "tenant-abc-xyz" // This part is injected automatically
}
}
]
}
}
}
filter
子句在OpenSearch中性能很高,因为它不计算相关性得分且可以被缓存。
下面是实现这个查询包装器的Go代码:
// pkg/search/client.go
package search
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
"github.com/you/your-project/pkg/auth"
"github.com/sirupsen/logrus"
)
// SearchClient is a wrapper around the OpenSearch client to enforce tenancy.
type SearchClient struct {
client *opensearch.Client
}
func NewSearchClient(client *opensearch.Client) *SearchClient {
return &SearchClient{client: client}
}
// SearchInvoices safely searches the invoices index.
func (c *SearchClient) SearchInvoices(ctx context.Context, userQuery map[string]interface{}) (map[string]interface{}, error) {
tenantID, ok := ctx.Value(auth.KeyTenantID).(string)
if !ok || tenantID == "" {
return nil, fmt.Errorf("access denied: missing tenant context for search")
}
// The core logic: build the final query by wrapping the user query.
finalQuery := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": userQuery["query"], // Assume userQuery has a "query" key
"filter": []map[string]interface{}{
{
"term": map[string]interface{}{
"tenant_id": tenantID,
},
},
},
},
},
}
// Add other top-level keys like "size", "sort", etc. if they exist in userQuery
if size, ok := userQuery["size"]; ok {
finalQuery["size"] = size
}
queryBytes, err := json.Marshal(finalQuery)
if err != nil {
return nil, fmt.Errorf("failed to marshal final query: %w", err)
}
// For debugging in development, this is invaluable.
logrus.WithField("query", string(queryBytes)).Debug("Executing OpenSearch query")
req := opensearchapi.SearchRequest{
Index: []string{"invoices"},
Body: bytes.NewReader(queryBytes),
}
res, err := req.Do(ctx, c.client)
if err != nil {
return nil, fmt.Errorf("opensearch search failed: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("opensearch error response: %s", string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode opensearch response: %w", err)
}
return result, nil
}
现在,我们有了一个SearchClient
,业务开发者只能通过它来执行搜索。它扮演了“安全网关”的角色,确保任何发往OpenSearch的查询都被正确地限定在当前租户的数据范围内。
迭代四:打通数据同步的任督二脉
隔离的最后一环,是如何保证从CockroachDB到OpenSearch的数据同步过程也是租户安全的。我们使用CockroachDB的CDC(Change Data Capture)功能,它能以事件流的形式捕获所有的数据变更。
-- Create a changefeed that emits JSON for every change in the invoices table.
-- The changefeed data will be sent to a Kafka topic.
CREATE CHANGEFEED FOR TABLE invoices
INTO 'kafka://kafka-broker:9092?topic_name=invoices_cdc'
WITH updated, resolved, format = json;
这条命令会创建一个持续的作业,将invoices
表的INSERT
, UPDATE
, DELETE
操作打包成JSON消息发送到Kafka。
然后,我们编写一个消费者服务来处理这些消息,并将其写入OpenSearch。
graph TD A[CockroachDB] -- 1. Data Change --> A subgraph CDC Process A -- 2. Changefeed Captures --> B(Kafka Topic: invoices_cdc) end C[CDC Consumer Service] -- 3. Consumes Message --> B C -- 4. Processes & Transforms --> D[OpenSearch]
这个消费者的实现很简单,但必须正确处理消息格式。CDC消息体包含了变更前后的完整数据行,其中自然也包括tenant_id
。
// cmd/cdc-consumer/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/opensearch-project/opensearch-go"
"github.com/segmentio/kafka-go"
// ... other imports
)
// CDCMessage represents the structure of a message from CockroachDB changefeed.
type CDCMessage struct {
After map[string]interface{} `json:"after"`
Key []interface{} `json:"primary_key"` // CockroachDB sends primary key as a JSON array
Topic string `json:"topic"`
Updated string `json:"updated"`
}
func main() {
// ... Setup for Kafka reader and OpenSearch client ...
reader := kafka.NewReader(...)
osClient, _ := opensearch.NewDefaultClient()
ctx := context.Background()
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
// Handle error, maybe break or log and continue
continue
}
var cdcMsg CDCMessage
if err := json.Unmarshal(msg.Value, &cdcMsg); err != nil {
// Log malformed message and skip
continue
}
// If 'after' is null, it's a DELETE operation.
if cdcMsg.After == nil {
// Handle delete in OpenSearch
docID := fmt.Sprintf("%v", cdcMsg.Key[0]) // Assuming single-column UUID primary key
// ... code to delete document by ID from OpenSearch ...
} else {
// It's an INSERT or UPDATE.
docID := fmt.Sprintf("%v", cdcMsg.After["id"])
docBody, _ := json.Marshal(cdcMsg.After)
// Index the document into OpenSearch using its primary key as the _id.
// This makes inserts and updates idempotent.
// ... code to index docBody into OpenSearch with ID docID ...
}
// Acknowledge the message in Kafka
reader.CommitMessages(ctx, msg)
}
}
这个CDC管道完成了数据闭环。任何对主数据库的写入,都会近乎实时地、且带着正确的tenant_id
反映到搜索引擎中。
当前方案的局限与未来展望
我们通过四个敏捷迭代,成功构建了一套健壮、对开发者友好的多租户数据隔离层。但这套系统并非银弹,它依然存在一些需要正视的局限和待优化的点:
- 性能开销: CockroachDB的RLS并非零成本,它会为每个查询增加策略检查的开销。同样,OpenSearch的查询包装也引入了额外的JSON处理。我们需要建立精细的性能基线监控,确保这些开销在可接受范围内。
- 跨租户管理: 当前模型严格禁止了跨租户查询。但对于内部运营、数据分析等后台系统,这种需求是真实存在的。未来的迭代需要设计一套独立的、高权限的“超级用户”访问路径,该路径必须绕过RLS,并辅以极其严格的审计日志。
- 测试复杂性: 端到端测试这套体系变得更加复杂。测试用例需要模拟来自不同租户、不同角色的JWT,并验证数据库和搜索引擎的返回结果是否都符合预期。这需要对我们的集成测试框架进行大幅升级。
- OpenSearch的深度安全: 当前对OpenSearch的保护依赖于应用层的查询包装器。一个更安全的做法是利用OpenSearch Security插件的文档级安全(Document Level Security)功能,它允许定义基于用户角色的查询过滤器。但这会引入额外的配置和管理复杂性,是我们计划在下个季度探索的优化方向。