实现地理分布式多租户架构的数据隔离层 一次结合OAuth 2.0, CockroachDB与OpenSearch的实践复盘


任何一个SaaS产品演进到一定阶段,都会不可避免地撞上多租户这堵墙。我们团队遇到的挑战则更为棘手:产品需要支持地理分布式部署,以满足不同国家的数据主权法规(如GDPR),同时又要保证严格的数据隔离。早期的设计简单粗暴,在每个业务表中都加入了tenant_id字段,然后在应用层逻辑中手动拼接WHERE子句。这种方式在敏捷开发初期跑得很快,但随着业务逻辑的膨胀,它变成了一场灾难。忘记添加tenant_id过滤条件的查询偶有发生,每一次都意味着一次潜在的数据泄露事故,这在生产环境中是不可接受的。

我们需要一个从架构层面解决问题的方案:一个能对应用开发者近乎透明,但在底层却坚不可摧的数据隔离层。这次复盘,就是记录我们如何一步步,通过几个敏捷迭代,利用OAuth 2.0、CockroachDB的行级安全(RLS)和OpenSearch的查询重写,构建出这套隔离体系的过程。

迭代零:确立原则与技术选型

在动手之前,我们首先定义了核心目标:

  1. 身份驱动隔离: 数据的访问权限必须与用户的身份绑定,而不是依赖于应用层代码的自觉性。
  2. 多存储系统一致性: 隔离策略必须同时在主事务数据库(CockroachDB)和搜索引擎(OpenSearch)中生效。
  3. 对开发者透明: 业务代码开发者应该专注于业务逻辑,而不是在每个数据访问点都关心租户隔离的实现细节。
  4. 地理亲和性: 数据应能根据租户的地理位置被固定在特定的区域。

基于这些原则,技术选型变得清晰起来:

  • 敏捷开发: 这个问题太复杂,不可能一次性完美解决。我们决定采用迭代的方式,先构建核心骨架,再逐步增强其安全性与功能。
  • OAuth 2.0 & JWT: 这是现代身份认证的事实标准。JWT的自定义声明(Custom Claims)是传递租户上下文(如tenant_id, user_role, data_region)的完美载体。
  • CockroachDB: 它的两个特性击中了我们的痛点。首先是原生的地理分区能力,可以将数据行固定在特定地理位置的节点上。其次是它实现了PostgreSQL兼容的行级安全策略(Row-Level Security, RLS),这正是实现数据库层面透明隔离的关键。
  • OpenSearch: 作为我们的搜索引擎,它同样需要实现租户隔离。虽然它没有像SQL数据库那样的RLS,但其强大的查询DSL和安全插件为我们实现隔离提供了通路。

迭代一:身份先行,构建带有租户上下文的请求管道

一切的起点是身份认证。当用户登录时,认证服务器必须在其签发的JWT中嵌入租户信息。一个典型的JWT Payload可能长这样:

{
  "sub": "user-uuid-12345",
  "exp": 1678886400,
  "iat": 1678882800,
  "iss": "https://auth.our-saas.com",
  "aud": "our-saas-api",
  "tid": "tenant-abc-xyz",
  "region": "eu-central-1",
  "roles": ["admin", "editor"]
}

这里的tidregion就是我们植入的自定义声明。

接下来,我们需要在API网关或服务自身的中间件层来解析这个JWT,并将租户上下文注入到请求的生命周期中。在Go中,我们利用context.Context来实现这一点。

// pkg/auth/middleware.go
package auth

import (
	"context"
	"net/http"
	"strings"

	"github.com/golang-jwt/jwt/v4"
	"github.com/sirupsen/logrus"
)

// TenantContextKey is a custom type to avoid context key collisions.
type TenantContextKey string

const (
	// KeyTenantID holds the tenant ID in the context.
	KeyTenantID TenantContextKey = "tenant_id"
	// KeyRegion holds the tenant's data region in the context.
	KeyRegion TenantContextKey = "region"
)

// Claims represents the custom JWT claims for our application.
type Claims struct {
	TenantID string   `json:"tid"`
	Region   string   `json:"region"`
	Roles    []string `json:"roles"`
	jwt.RegisteredClaims
}

// JWTMiddleware validates the JWT and injects tenant info into the context.
func JWTMiddleware(jwtSecret []byte) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			authHeader := r.Header.Get("Authorization")
			if authHeader == "" {
				http.Error(w, "Authorization header required", http.StatusUnauthorized)
				return
			}

			tokenString := strings.TrimPrefix(authHeader, "Bearer ")
			if tokenString == authHeader { // No "Bearer " prefix
				http.Error(w, "Invalid token format", http.StatusUnauthorized)
				return
			}

			token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
				// Basic validation: ensure signing algorithm is what we expect.
				if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
					logrus.WithField("alg", token.Header["alg"]).Warn("Unexpected signing method")
					return nil, http.ErrAbortHandler
				}
				return jwtSecret, nil
			})

			if err != nil {
				http.Error(w, "Invalid token: "+err.Error(), http.StatusUnauthorized)
				return
			}

			if claims, ok := token.Claims.(*Claims); ok && token.Valid {
				if claims.TenantID == "" {
					http.Error(w, "Token is missing tenant ID", http.StatusForbidden)
					return
				}
				// Inject tenant info into the request context.
				ctx := context.WithValue(r.Context(), KeyTenantID, claims.TenantID)
				ctx = context.WithValue(ctx, KeyRegion, claims.Region)
				
				// Here's a real-world logging practice: add context to all subsequent logs.
				reqLogger := logrus.WithFields(logrus.Fields{
					"tenant_id": claims.TenantID,
					"user_id":   claims.Subject,
				})
				// It's tricky to pass a logger through context, but this illustrates the goal.
				// A better way is to pull info from context within the logger itself.

				next.ServeHTTP(w, r.WithContext(ctx))
			} else {
				http.Error(w, "Invalid token claims", http.StatusUnauthorized)
			}
		})
	}
}

这个中间件是整个体系的入口。它确保了任何一个通过认证的请求,其context中都必然携带着tenant_id。这是一个关键的约定,下游所有的数据访问逻辑都将依赖于此。这个迭代的产出看似简单,但它建立了一个贯穿整个调用链的、可靠的上下文传递机制。

迭代二:在CockroachDB中强制隔离

这是最核心的一步。我们的目标是,当业务代码执行SELECT * FROM invoices WHERE amount > 1000时,数据库能自动将其转换为SELECT * FROM invoices WHERE amount > 1000 AND tenant_id = 'current_tenant_id'

首先,在CockroachDB中定义表结构和RLS策略。

-- 数据库表结构,包含租户ID和区域信息
-- The crdb_region column is special in CockroachDB for geo-partitioning
CREATE TABLE invoices (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id STRING NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now(),
    crdb_region crdb_internal_region NOT NULL
);

-- 创建索引,tenant_id是绝大多数查询的入口,必须高效
CREATE INDEX ON invoices (tenant_id, created_at);

-- 启用行级安全
ALTER TABLE invoices ENABLE ROW LEVEL SECURITY;

-- 定义一个会话变量来存储当前租户ID
-- CockroachDB/Postgres doesn't have globally defined variables, so we use a rarely used config parameter.
-- The 'app.' prefix is a convention to avoid conflicts.
-- We must make sure this is set per-transaction.

-- 创建RLS策略
-- 这个策略规定,对于SELECT, UPDATE, DELETE操作,只有当行的tenant_id
-- 与当前会话变量'app.tenant_id'匹配时,该行才可见或可操作。
CREATE POLICY tenant_isolation_policy ON invoices
    FOR ALL -- Applies to SELECT, INSERT, UPDATE, DELETE
    USING (tenant_id = current_setting('app.tenant_id'));

这里的魔法是current_setting('app.tenant_id')。它读取一个特定于当前数据库会话的变量。这意味着,在执行任何查询之前,我们必须先为当前会сил话设置这个变量。

接下来是Go的数据访问层代码。这里的坑在于,数据库连接池会复用连接。如果在一个被污染(设置了其他租户ID)的连接上执行操作,就会导致数据错乱。因此,app.tenant_id的设置必须与事务或单个请求的生命周期严格绑定。

// pkg/storage/invoices.go
package storage

import (
	"context"
	"database/sql"
	"fmt"

	"github.com/you/your-project/pkg/auth" // our context key package
	"github.com/jackc/pgx/v4/pgxpool"
	"github.com/sirupsen/logrus"
)

// InvoiceRepository handles DB operations for invoices.
type InvoiceRepository struct {
	pool *pgxpool.Pool
}

func NewInvoiceRepository(dbPool *pgxpool.Pool) *InvoiceRepository {
	return &InvoiceRepository{pool: dbPool}
}

// GetInvoicesForCurrentUser fetches invoices for the tenant in the context.
// Notice the business logic here has NO knowledge of tenant_id filtering.
func (r *InvoiceRepository) GetInvoicesForCurrentUser(ctx context.Context) ([]string, error) {
	// 1. Extract tenant_id from context. This is non-negotiable.
	tenantID, ok := ctx.Value(auth.KeyTenantID).(string)
	if !ok || tenantID == "" {
		logrus.Error("Tenant ID not found in context. Denying data access.")
		return nil, fmt.Errorf("access denied: missing tenant context")
	}

	conn, err := r.pool.Acquire(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to acquire connection: %w", err)
	}
	defer conn.Release()

	// 2. THIS IS THE CRITICAL STEP. Set the session variable for the acquired connection.
	// Using 'LOCAL' ensures the setting is only for the current transaction.
	// This prevents connection pool pollution.
	setVarQuery := "SET LOCAL app.tenant_id = $1"
	if _, err := conn.Exec(ctx, setVarQuery, tenantID); err != nil {
		return nil, fmt.Errorf("failed to set tenant context for db session: %w", err)
	}

	// 3. Now, execute the business query. It's clean and unaware of multi-tenancy.
	query := "SELECT id FROM invoices ORDER BY created_at DESC LIMIT 10"
	rows, err := conn.Query(ctx, query)
	if err != nil {
		// Proper error logging in production is vital.
		logrus.WithFields(logrus.Fields{
			"tenant_id": tenantID,
			"error":     err,
		}).Error("Failed to execute invoice query")
		return nil, fmt.Errorf("database query failed: %w", err)
	}
	defer rows.Close()

	var ids []string
	for rows.Next() {
		var id string
		if err := rows.Scan(&id); err != nil {
			return nil, err
		}
		ids = append(ids, id)
	}

	return ids, nil
}

通过这种方式,我们把安全策略下沉到了数据库层面。应用代码变得干净,并且从根本上杜绝了因忘记WHERE tenant_id而导致的数据泄露。单元测试这个逻辑也变得直接:提供一个带有tenant_id的context,断言能查到数据;提供一个空的context,断言返回错误;提供一个错误的tenant_id,断言查询结果为空。

迭代三:将隔离策略同步到OpenSearch

事务数据隔离后,搜索数据成了新的短板。用户在搜索框里输入内容,我们不希望A租户的员工搜到B租户的机密发票。

OpenSearch没有与CockroachDB RLS直接等价的功能,但我们可以通过组合其现有能力来模拟。我们放弃了“一个租户一个索引”的方案,因为它会导致“索引地狱”,管理和资源成本极高。我们选择共享索引的策略,即所有租户的发票数据都存在一个名为invoices的索引中,但每个文档都必须包含tenant_id字段。

隔离的关键在于,在执行搜索前,对用户的查询DSL(Domain Specific Language)进行“包装”,强制注入一个filter子句。

// 用户原始的查询DSL
{
  "query": {
    "match": {
      "description": "urgent project"
    }
  }
}

// 经过我们的数据访问层包装后的查询DSL
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "description": "urgent project"
          }
        }
      ],
      "filter": [
        {
          "term": {
            "tenant_id": "tenant-abc-xyz" // This part is injected automatically
          }
        }
      ]
    }
  }
}

filter子句在OpenSearch中性能很高,因为它不计算相关性得分且可以被缓存。

下面是实现这个查询包装器的Go代码:

// pkg/search/client.go
package search

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"

	"github.com/opensearch-project/opensearch-go"
	"github.com/opensearch-project/opensearch-go/opensearchapi"
	"github.com/you/your-project/pkg/auth"
	"github.com/sirupsen/logrus"
)

// SearchClient is a wrapper around the OpenSearch client to enforce tenancy.
type SearchClient struct {
	client *opensearch.Client
}

func NewSearchClient(client *opensearch.Client) *SearchClient {
	return &SearchClient{client: client}
}

// SearchInvoices safely searches the invoices index.
func (c *SearchClient) SearchInvoices(ctx context.Context, userQuery map[string]interface{}) (map[string]interface{}, error) {
	tenantID, ok := ctx.Value(auth.KeyTenantID).(string)
	if !ok || tenantID == "" {
		return nil, fmt.Errorf("access denied: missing tenant context for search")
	}

	// The core logic: build the final query by wrapping the user query.
	finalQuery := map[string]interface{}{
		"query": map[string]interface{}{
			"bool": map[string]interface{}{
				"must":   userQuery["query"], // Assume userQuery has a "query" key
				"filter": []map[string]interface{}{
					{
						"term": map[string]interface{}{
							"tenant_id": tenantID,
						},
					},
				},
			},
		},
	}
    
    // Add other top-level keys like "size", "sort", etc. if they exist in userQuery
    if size, ok := userQuery["size"]; ok {
        finalQuery["size"] = size
    }

	queryBytes, err := json.Marshal(finalQuery)
	if err != nil {
		return nil, fmt.Errorf("failed to marshal final query: %w", err)
	}

	// For debugging in development, this is invaluable.
	logrus.WithField("query", string(queryBytes)).Debug("Executing OpenSearch query")

	req := opensearchapi.SearchRequest{
		Index: []string{"invoices"},
		Body:  bytes.NewReader(queryBytes),
	}

	res, err := req.Do(ctx, c.client)
	if err != nil {
		return nil, fmt.Errorf("opensearch search failed: %w", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		bodyBytes, _ := io.ReadAll(res.Body)
		return nil, fmt.Errorf("opensearch error response: %s", string(bodyBytes))
	}

	var result map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
		return nil, fmt.Errorf("failed to decode opensearch response: %w", err)
	}

	return result, nil
}

现在,我们有了一个SearchClient,业务开发者只能通过它来执行搜索。它扮演了“安全网关”的角色,确保任何发往OpenSearch的查询都被正确地限定在当前租户的数据范围内。

迭代四:打通数据同步的任督二脉

隔离的最后一环,是如何保证从CockroachDB到OpenSearch的数据同步过程也是租户安全的。我们使用CockroachDB的CDC(Change Data Capture)功能,它能以事件流的形式捕获所有的数据变更。

-- Create a changefeed that emits JSON for every change in the invoices table.
-- The changefeed data will be sent to a Kafka topic.
CREATE CHANGEFEED FOR TABLE invoices
    INTO 'kafka://kafka-broker:9092?topic_name=invoices_cdc'
    WITH updated, resolved, format = json;

这条命令会创建一个持续的作业,将invoices表的INSERT, UPDATE, DELETE操作打包成JSON消息发送到Kafka。

然后,我们编写一个消费者服务来处理这些消息,并将其写入OpenSearch。

graph TD
    A[CockroachDB] -- 1. Data Change --> A
    subgraph CDC Process
        A -- 2. Changefeed Captures --> B(Kafka Topic: invoices_cdc)
    end
    C[CDC Consumer Service] -- 3. Consumes Message --> B
    C -- 4. Processes & Transforms --> D[OpenSearch]

这个消费者的实现很简单,但必须正确处理消息格式。CDC消息体包含了变更前后的完整数据行,其中自然也包括tenant_id

// cmd/cdc-consumer/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	
	"github.com/opensearch-project/opensearch-go"
	"github.com/segmentio/kafka-go"
	// ... other imports
)

// CDCMessage represents the structure of a message from CockroachDB changefeed.
type CDCMessage struct {
	After  map[string]interface{} `json:"after"`
	Key    []interface{} `json:"primary_key"` // CockroachDB sends primary key as a JSON array
	Topic  string `json:"topic"`
	Updated string `json:"updated"`
}

func main() {
	// ... Setup for Kafka reader and OpenSearch client ...
	
	reader := kafka.NewReader(...)
	osClient, _ := opensearch.NewDefaultClient()
	
	ctx := context.Background()
	for {
		msg, err := reader.ReadMessage(ctx)
		if err != nil {
			// Handle error, maybe break or log and continue
			continue
		}
		
		var cdcMsg CDCMessage
		if err := json.Unmarshal(msg.Value, &cdcMsg); err != nil {
			// Log malformed message and skip
			continue
		}

		// If 'after' is null, it's a DELETE operation.
		if cdcMsg.After == nil {
			// Handle delete in OpenSearch
			docID := fmt.Sprintf("%v", cdcMsg.Key[0]) // Assuming single-column UUID primary key
			// ... code to delete document by ID from OpenSearch ...
		} else {
			// It's an INSERT or UPDATE.
			docID := fmt.Sprintf("%v", cdcMsg.After["id"])
			docBody, _ := json.Marshal(cdcMsg.After)
			
			// Index the document into OpenSearch using its primary key as the _id.
			// This makes inserts and updates idempotent.
			// ... code to index docBody into OpenSearch with ID docID ...
		}
		
		// Acknowledge the message in Kafka
		reader.CommitMessages(ctx, msg)
	}
}

这个CDC管道完成了数据闭环。任何对主数据库的写入,都会近乎实时地、且带着正确的tenant_id反映到搜索引擎中。

当前方案的局限与未来展望

我们通过四个敏捷迭代,成功构建了一套健壮、对开发者友好的多租户数据隔离层。但这套系统并非银弹,它依然存在一些需要正视的局限和待优化的点:

  1. 性能开销: CockroachDB的RLS并非零成本,它会为每个查询增加策略检查的开销。同样,OpenSearch的查询包装也引入了额外的JSON处理。我们需要建立精细的性能基线监控,确保这些开销在可接受范围内。
  2. 跨租户管理: 当前模型严格禁止了跨租户查询。但对于内部运营、数据分析等后台系统,这种需求是真实存在的。未来的迭代需要设计一套独立的、高权限的“超级用户”访问路径,该路径必须绕过RLS,并辅以极其严格的审计日志。
  3. 测试复杂性: 端到端测试这套体系变得更加复杂。测试用例需要模拟来自不同租户、不同角色的JWT,并验证数据库和搜索引擎的返回结果是否都符合预期。这需要对我们的集成测试框架进行大幅升级。
  4. OpenSearch的深度安全: 当前对OpenSearch的保护依赖于应用层的查询包装器。一个更安全的做法是利用OpenSearch Security插件的文档级安全(Document Level Security)功能,它允许定义基于用户角色的查询过滤器。但这会引入额外的配置和管理复杂性,是我们计划在下个季度探索的优化方向。

  目录