使用 Swift 和 SQLite 构建离线向量搜索引擎并以 Chef 实现数据管道自动化


为一个纯离线的 iOS 应用实现语义搜索,同时要保证用户数据绝对不离开设备,这是我们面临的第一个挑战。传统的方案,无论是调用云端的 Embedding API 还是将查询发送到托管的向量数据库,都直接与项目的核心隐私原则相悖。因此,整个 AI 推理、数据存储和查询过程必须完全在用户设备上闭环。这意味着我们需要一个能在 Swift 环境中高效运行的、嵌入式的向量数据库。

最初的构想是在应用中集成一个现成的移动端向量库。然而,这会引入新的、庞大的依赖,增加应用体积,并且可能与我们现有的基于 GRDB.swift 和 SQLite 的持久化方案产生冲突。一个更具吸引力的方案是:能否在现有的 SQLite 数据库上直接实现向量存储和相似度搜索?这不仅能复用我们已经非常熟悉的技术栈,还能最大限度地控制应用的复杂性。

但真正的难题不止于此。为这个离线搜索引擎提供数据的预处理管道——包括文档清洗、分块、向量化,并最终打包成一个优化过的 SQLite 文件——这个过程本身就极为复杂且容易出错。任何手动操作都将是灾难的开始。我们需要一个工业级的、可复现的、自动化的流程来生产这个核心数据构件。这正是 Chef 进入我们技术选型视野的原因,尽管将一个基础设施即代码工具用于数据预处理流水线听起来有些非主流,但其声明式、幂等的特性恰好解决了我们的痛点。

我们的整体架构因此成型:

graph TD
    A[原始文档源 / S3 Bucket] --> B{Chef Recipe: 数据管道};
    subgraph "CI/CD 环境 或 开发者本地"
        B -- 1. 同步数据 --> C[下载并清洗文档];
        C -- 2. 运行脚本 --> D[Python脚本: 生成Embeddings];
        D -- 3. 构建数据库 --> E[生成 pre-warmed.sqlite 文件];
    end
    E -- 打包进 App Bundle --> F[iOS 应用];

    subgraph "用户设备 (Offline)"
        G[用户输入查询] --> H{Swift App};
        H -- 1. 调用 CoreML --> I[将查询文本向量化];
        I -- 2. 查询 SQLite --> J[在 pre-warmed.sqlite 中执行向量搜索];
        J -- 3. 返回结果 --> H;
        H -- 显示结果 --> K[UI];
    end

这个方案将复杂的AI数据预处理过程(一个典型的数据科学任务)封装在一个 Chef Cookbook 中,使其成为一个可版本化、可自动执行的工程任务。最终产物是一个 SQLite 文件,可以像任何其他资源一样被 iOS 应用直接使用。

第一阶段:使用 Chef 自动化数据构件生产

我们的目标是创建一个 Chef Recipe,它能完成以下所有工作:

  1. 从指定源拉取原始文本文档。
  2. 安装必要的 Python 环境和依赖库 (如 transformers, torch)。
  3. 执行 Python 脚本,将文档转换为向量。
  4. 将文本内容、元数据和向量数据写入一个结构化的 SQLite 数据库。

在真实项目中,这个流程通常在 CI/CD 服务器上运行。

Chef Cookbook 结构

chef-repo/
└── cookbooks/
    └── offline_search_pipeline/
        ├── attributes/
        │   └── default.rb
        ├── files/
        │   └── default/
        │       └── process_documents.py
        ├── recipes/
        │   └── default.rb
        └── metadata.rb

Attributes (attributes/default.rb)

我们用 attributes 来定义所有可配置的参数,比如数据源、模型名称和输出路径。这使得在不同环境(例如,开发环境使用小数据集,生产环境使用完整数据)中复用此 Cookbook 变得简单。

# attributes/default.rb

# 数据源配置
default['offline_search_pipeline']['data_source_url'] = 'https://your-data-source.com/documents.zip'
default['offline_search_pipeline']['data_archive_path'] = '/tmp/documents.zip'
default['offline_search_pipeline']['data_extract_path'] = '/tmp/documents_raw'

# Python 虚拟环境配置
default['offline_search_pipeline']['venv_path'] = '/opt/search_venv'

# AI 模型配置
default['offline_search_pipeline']['embedding_model'] = 'sentence-transformers/all-MiniLM-L6-v2'

# 输出构件配置
default['offline_search_pipeline']['output_dir'] = '/var/chef/outputs'
default['offline_search_pipeline']['sqlite_db_name'] = 'knowledge_base.sqlite'

核心处理脚本 (files/default/process_documents.py)

这个 Python 脚本是数据处理的核心。它使用 sentence-transformers 库将文本转换为向量,并使用 sqlite3 将它们存入数据库。这里的坑在于向量的存储格式。直接存储为 TEXT 会有性能问题,存储为 BLOB 是更好的选择。我们还需要确保以原子方式构建数据库,避免生成不完整的文件。

# files/default/process_documents.py

import os
import sys
import sqlite3
import numpy as np
from sentence_transformers import SentenceTransformer
import logging
import json

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从命令行参数获取路径
SOURCE_DIR = sys.argv[1]
DB_PATH = sys.argv[2]
MODEL_NAME = sys.argv[3]

# 确保输出目录存在
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)

# 确保在开始前删除旧数据库,保证幂等性
if os.path.exists(DB_PATH):
    os.remove(DB_PATH)
    logging.info(f"Removed existing database at {DB_PATH}")

def create_database_schema(conn):
    """创建数据库表结构"""
    cursor = conn.cursor()
    # FTS5 用于全文搜索,documents 表存储核心数据
    cursor.execute("""
        CREATE TABLE documents (
            id INTEGER PRIMARY KEY,
            title TEXT NOT NULL,
            content TEXT NOT NULL,
            metadata TEXT, -- JSON blob for extra info
            embedding BLOB NOT NULL -- Store vector as a BLOB
        );
    """)
    cursor.execute("""
        CREATE VIRTUAL TABLE documents_fts USING fts5(
            title,
            content,
            content='documents',
            content_rowid='id'
        );
    """)
    # 创建触发器,在 documents 表插入、更新、删除时同步 FTS 索引
    cursor.execute("""
        CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN
            INSERT INTO documents_fts(rowid, title, content) VALUES (new.id, new.title, new.content);
        END;
    """)
    # 省略 UPDATE 和 DELETE 触发器...
    conn.commit()
    logging.info("Database schema and FTS5 index created.")

def process_and_embed_documents(source_path, model):
    """遍历文档,生成 embeddings,并以生成器形式返回"""
    for filename in os.listdir(source_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(source_path, filename)
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    
                # 假设文件名即标题
                title = os.path.splitext(filename)[0]
                
                # 生成 embedding
                vector = model.encode(content, convert_to_numpy=True)
                
                # 将 numpy array 转换为 bytes (BLOB)
                vector_blob = vector.astype(np.float32).tobytes()
                
                # 附加元数据
                metadata = json.dumps({"source_file": filename, "char_count": len(content)})
                
                yield (title, content, metadata, vector_blob)
            except Exception as e:
                logging.error(f"Failed to process {filename}: {e}")

def main():
    logging.info(f"Loading embedding model: {MODEL_NAME}")
    model = SentenceTransformer(MODEL_NAME)
    
    # 获取向量维度,之后在 Swift 端需要用到
    embedding_dim = model.get_sentence_embedding_dimension()
    logging.info(f"Model embedding dimension: {embedding_dim}")

    try:
        conn = sqlite3.connect(DB_PATH)
        create_database_schema(conn)
        
        cursor = conn.cursor()
        
        logging.info(f"Processing documents from {SOURCE_DIR}...")
        docs_generator = process_and_embed_documents(SOURCE_DIR, model)
        
        cursor.executemany(
            "INSERT INTO documents (title, content, metadata, embedding) VALUES (?, ?, ?, ?)",
            docs_generator
        )
        
        conn.commit()
        logging.info(f"Successfully inserted all documents into {DB_PATH}")

    except sqlite3.Error as e:
        logging.error(f"Database error: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    finally:
        if conn:
            conn.close()

if __name__ == '__main__':
    if len(sys.argv) != 4:
        print("Usage: python process_documents.py <source_dir> <db_path> <model_name>")
        sys.exit(1)
    main()

Chef Recipe (recipes/default.rb)

这是将所有步骤粘合在一起的“胶水”。它负责环境设置、文件分发和命令执行。

# recipes/default.rb

# 定义变量以提高可读性
venv_path = node['offline_search_pipeline']['venv_path']
python_bin = "#{venv_path}/bin/python"
pip_bin = "#{venv_path}/bin/pip"
script_path = "#{Chef::Config[:file_cache_path]}/process_documents.py"
output_db_path = ::File.join(node['offline_search_pipeline']['output_dir'], node['offline_search_pipeline']['sqlite_db_name'])

# 1. 确保系统依赖已安装
package 'unzip'
package 'python3-venv' do
  action :install
end

# 2. 创建 Python 虚拟环境
execute 'create_python_venv' do
  command "python3 -m venv #{venv_path}"
  creates venv_path # 幂等性:如果目录已存在,则不执行
end

# 3. 安装 Python 依赖库
# 使用 requirements 文件是更佳实践,此处为简化示例
%w(sentence-transformers torch numpy).each do |pkg|
  execute "pip_install_#{pkg}" do
    command "#{pip_bin} install #{pkg}"
    # 只有在库未安装时才执行
    not_if "#{pip_bin} freeze | grep -q #{pkg}"
  end
end

# 4. 下载并解压数据源
remote_file node['offline_search_pipeline']['data_archive_path'] do
  source node['offline_search_pipeline']['data_source_url']
  mode '0644'
  notifies :run, 'execute[extract_data_archive]', :immediately
end

execute 'extract_data_archive' do
  command "unzip -o #{node['offline_search_pipeline']['data_archive_path']} -d #{node['offline_search_pipeline']['data_extract_path']}"
  action :nothing # 只有在 remote_file 下载新文件后才触发
end

# 5. 分发处理脚本
cookbook_file script_path do
  source 'process_documents.py'
  mode '0755'
end

# 6. 执行数据处理流水线
# 这是一个关键步骤,它将所有部分联系起来
execute 'run_embedding_pipeline' do
  command <<-EOH
    #{python_bin} #{script_path} \
    #{node['offline_search_pipeline']['data_extract_path']} \
    #{output_db_path} \
    "#{node['offline_search_pipeline']['embedding_model']}"
  EOH
  # 设置超时,因为模型下载和处理可能需要很长时间
  timeout 3600 
  # 只有在源数据或脚本更新时才重新运行,这是Chef幂等性的核心
  # 我们可以通过订阅 `remote_file` 和 `cookbook_file` 来实现这一点
  # 为简化,这里每次 chef-client 运行时都会检查
  # 在真实场景中,会使用更复杂的 guard 条件
  action :run
end

运行 chef-client -z -o offline_search_pipeline 后,我们就会在 /var/chef/outputs 目录下得到一个 knowledge_base.sqlite 文件。这个文件现在是我们的“数据构件”,可以被版本化控制,并打包到 iOS 应用中。

第二阶段:在 Swift 中实现离线向量搜索

现在我们有了一个包含向量数据的 SQLite 文件。下一步是在 iOS 应用中查询它。我们将使用 GRDB.swift 库来安全、高效地与 SQLite 交互。

数据模型与数据库访问

首先,我们需要定义一个与数据库表结构匹配的 Swift struct

// Document.swift

import Foundation
import GRDB

struct Document: Codable, FetchableRecord, PersistableRecord {
    var id: Int64?
    var title: String
    var content: String
    var metadata: String // Store as JSON string
    var embedding: Data // Store blob as Data
    
    // 我们需要一个非数据库字段来存储计算出的相似度分数
    var similarity: Float? = nil
}

// 定义数据库访问服务
class DatabaseService {
    private var dbQueue: DatabaseQueue
    
    // 向量维度必须与 Python 脚本中使用的模型一致
    static let embeddingDimension = 384

    init(databasePath: String) throws {
        dbQueue = try DatabaseQueue(path: databasePath)
    }

    // 将数据库从 App Bundle 复制到可写目录
    static func setupDatabase() throws -> String {
        let fileManager = FileManager.default
        let dbName = "knowledge_base.sqlite"
        let appSupportURL = try fileManager.url(for: .applicationSupportDirectory, in: .userDomainMask, appropriateFor: nil, create: true)
        let destinationURL = appSupportURL.appendingPathComponent(dbName)

        // 如果数据库已存在,则不复制,除非需要更新
        if !fileManager.fileExists(atPath: destinationURL.path) {
            guard let sourceURL = Bundle.main.url(forResource: "knowledge_base", withExtension: "sqlite") else {
                fatalError("Fatal Error: knowledge_base.sqlite not found in bundle.")
            }
            try fileManager.copyItem(at: sourceURL, to: destinationURL)
        }
        
        return destinationURL.path
    }
}

实现向量相似度计算

SQLite 本身没有内置的向量函数,所以我们必须在 Swift 中实现。余弦相似度是常用的度量。我们需要一个函数,将存储为 Data (BLOB) 的向量转换回浮点数数组,并计算它们与查询向量的相似度。

// VectorUtils.swift

import Foundation
import Accelerate

enum VectorError: Error {
    case invalidDataSize(expected: Int, actual: Int)
}

final class VectorUtils {
    
    // 将 Data (BLOB) 转换为 [Float]
    static func dataToFloatArray(_ data: Data, dimension: Int) throws -> [Float] {
        let expectedSize = dimension * MemoryLayout<Float32>.size
        guard data.count == expectedSize else {
            throw VectorError.invalidDataSize(expected: expectedSize, actual: data.count)
        }
        
        return data.withUnsafeBytes { (pointer: UnsafeRawBufferPointer) -> [Float] in
            let floatBuffer = pointer.bindMemory(to: Float32.self)
            return Array(floatBuffer)
        }
    }
    
    // 使用 Accelerate 框架高效计算余弦相似度
    static func cosineSimilarity(a: [Float], b: [Float]) -> Float {
        guard a.count == b.count, !a.isEmpty else { return 0.0 }
        
        var dotProduct: Float = 0.0
        vDSP_dotpr(a, 1, b, 1, &dotProduct, vDSP_Length(a.count))
        
        var aMagnitude: Float = 0.0
        vDSP_rmsqv(a, 1, &aMagnitude, vDSP_Length(a.count))
        aMagnitude *= sqrt(Float(a.count))
        
        var bMagnitude: Float = 0.0
        vDSP_rmsqv(b, 1, &bMagnitude, vDSP_Length(b.count))
        bMagnitude *= sqrt(Float(b.count))
        
        if aMagnitude == 0.0 || bMagnitude == 0.0 {
            return 0.0
        }
        
        return dotProduct / (aMagnitude * bMagnitude)
    }
}

这里的关键是使用了 Accelerate 框架 (vDSP)。直接用 Swift 循环计算点积和模长会非常慢,而 vDSP 利用了底层的 SIMD 指令,性能要高出几个数量级。这是一个在真实项目中必须进行的优化。

执行向量搜索

现在我们可以组合所有部分来执行搜索。这是一个“暴力”或“精确”的 K-最近邻 (KNN) 搜索。它会遍历数据库中的每一个向量,计算相似度,然后排序。

// DatabaseService+Search.swift

extension DatabaseService {
    
    func findSimilarDocuments(for queryVector: [Float], limit: Int) async throws -> [Document] {
        guard queryVector.count == Self.embeddingDimension else {
            // 在生产代码中应该抛出一个更具体的错误
            throw NSError(domain: "SearchError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Invalid query vector dimension."])
        }

        // 1. 读取所有文档和它们的 embedding
        let allDocuments = try await dbQueue.read { db in
            try Document.fetchAll(db)
        }

        // 2. 在内存中计算相似度
        // 这是一个计算密集型操作,应该在后台线程执行
        var documentsWithSimilarity = [Document]()
        for var doc in allDocuments {
            do {
                let dbVector = try VectorUtils.dataToFloatArray(doc.embedding, dimension: Self.embeddingDimension)
                doc.similarity = VectorUtils.cosineSimilarity(a: queryVector, b: dbVector)
                documentsWithSimilarity.append(doc)
            } catch {
                // 记录错误但继续处理其他文档
                print("Error decoding vector for document ID \(String(describing: doc.id)): \(error)")
            }
        }
        
        // 3. 排序并返回 top K
        let sortedDocuments = documentsWithSimilarity.sorted {
            // similarity 可能是 nil,需要处理
            ($0.similarity ?? -1.0) > ($1.similarity ?? -1.0)
        }
        
        return Array(sortedDocuments.prefix(limit))
    }
}

这个实现有一个明显的性能瓶颈:它将整个数据库加载到内存中。对于成千上万个向量来说,这是不可接受的。一个常见的错误是认为移动设备内存足够大。但在真实的多任务环境中,这种内存使用会很快导致应用被系统终止。

一个改进方案是使用 SQLite 的自定义函数,将相似度计算下推到数据库层面。

// DatabaseService+CustomFunction.swift

extension DatabaseService {
    // 注册自定义 SQL 函数
    func registerCosineSimilarityFunction() {
        let cosineSimilarity = DatabaseFunction("cosine_similarity", argumentCount: 2) { dbValues in
            guard let queryVectorBlob = dbValues[0].data,
                  let dbVectorBlob = dbValues[1].data else {
                return nil
            }
            
            do {
                let queryVector = try VectorUtils.dataToFloatArray(queryVectorBlob, dimension: Self.embeddingDimension)
                let dbVector = try VectorUtils.dataToFloatArray(dbVectorBlob, dimension: Self.embeddingDimension)
                return VectorUtils.cosineSimilarity(a: queryVector, b: dbVector)
            } catch {
                // 在 SQL 函数中,我们不能轻易地抛出 Swift 错误,通常返回 nil
                return nil
            }
        }
        dbQueue.add(function: cosineSimilarity)
    }

    // 使用自定义函数进行优化的搜索
    func findSimilarDocumentsOptimized(for queryVector: [Float], limit: Int) async throws -> [Document] {
        let queryVectorData = queryVector.withUnsafeBufferPointer { Data(buffer: $0) }
        
        let documents = try await dbQueue.read { db in
            let sql = """
                SELECT *, cosine_similarity(?, embedding) as similarity
                FROM documents
                ORDER BY similarity DESC
                LIMIT ?
            """
            
            let request = SQLRequest<Document>(sql: sql, arguments: [queryVectorData, limit])
            return try request.fetchAll(db)
        }
        
        return documents
    }
}

通过自定义函数,计算发生在 SQLite 内部,避免了将所有向量数据加载到 Swift 内存中,大大降低了内存占用。虽然这仍然是全表扫描,CPU 消耗没有变,但内存使用得到了质的优化。

方案的局限性与未来迭代路径

当前实现的暴力搜索方案在向量数量超过几万个时,查询延迟会变得无法接受(通常会达到数百毫秒甚至秒级)。这里的核心瓶颈在于 O(N) 的计算复杂度。

局限性:

  1. 性能扩展性差: 随着文档数量增加,查询时间呈线性增长。
  2. 无实时更新: 整个数据构件是静态的。如果需要在设备上添加或更新文档,就需要一个更复杂的流程来生成新的向量并更新索引。
  3. 模型与数据耦合: 当前的 Chef 管道将数据处理和模型绑定。如果需要更换 embedding 模型,必须完全重新生成整个 SQLite 数据库。

未来迭代方向:

  1. 实现近似最近邻 (ANN) 索引: 这是解决性能问题的根本方法。可以在 Swift 中实现一个轻量级的 ANN 算法,如 HNSW (Hierarchical Navigable Small World) 的简化版本。具体做法是,在 Chef 管道中预先构建 HNSW 图的层和连接关系,并将这些图结构数据存储在 SQLite 的新表中。查询时,Swift 代码会加载图数据,并执行贪心搜索,将复杂度从 O(N) 降低到接近 O(log N)。
  2. 向量量化: 为了减少数据库大小和内存占用,可以采用标量量化 (Scalar Quantization) 或乘积量化 (Product Quantization)。将 float32 向量转换为 int8 不仅能将存储空间减少 75%,还能利用 SIMD 指令进行更快的距离计算,尽管会损失一些精度。
  3. 混合搜索: 结合 FTS5 的全文搜索和向量搜索。可以先用 FTS5 快速筛选出一批候选文档,然后只对这个小子集进行向量相似度计算。这种方法在很多场景下都能在精度和性能之间取得很好的平衡。

  目录