一个典型的周二下午,告警系统开始尖叫。我们一个核心的 Python ETL 作业失败了,这个作业负责将上游服务的 JSON 日志聚合到数据湖的 Parquet 文件中。问题不在于作业失败,这很常见。问题在于它失败在了中间步骤——它已经覆盖了一部分昨天的分区,然后才因为一个意料之外的 KeyError
崩溃。数据湖的核心事实表现在处于一个不一致的、被部分污染的状态。下游的报表和机器学习模型开始产出无意义的结果。恢复过程是手动的、痛苦的,需要从备份中拷贝数据,并祈祷我们没有丢失任何东西。
这种场景在直接操作 S3 或 HDFS 上的文件的传统数据湖中太常见了。我们缺乏原子性。任何写入操作,要么完全成功,要么完全失败,不能留下中间状态。Git 可以版本化我们的 Python 脚本,但它无法版本化存储在 S3 上的 5TB 数据。我们尝试过 DVC,它在管理中等规模数据集快照方面表现出色,但对于一个每天需要增量更新、分区庞大的事实表来说,每次 dvc add
一个 5TB 的目录都是不现实的,而且它依然无法解决原子写入的问题。
我们需要一个更好的方案,一个能将数据操作的原子性与管道逻辑的可复现性结合起来的架构。经过一番研究和原型验证,我们最终敲定了一个组合:将 Apache Iceberg 作为数据湖的事务性表格式,同时继续使用 DVC 来管理和版本化驱动整个流程的 Python 管道。这不是用一个工具替换另一个,而是让它们在各自最擅长的层面协同工作。DVC 负责“过程”的版本化,Iceberg 负责“结果”的版本化。
架构构想:双层版本控制
我们的核心痛点是数据一致性和流程可复现性。因此,架构设计的关键在于分层解决问题。
graph TD subgraph Git Repository A[main.py: ETL 脚本] B[dvc.yaml: 管道定义] C[params.yaml: 配置参数] D[version.txt: 指向 Iceberg 快照ID的指针文件] end subgraph DVC Cache E[.dvc/cache] end subgraph Execution Engine F[Python Process] end subgraph Data Lakehouse Storage G[Iceberg REST Catalog] H[S3/MinIO Bucket] end subgraph S3 Bucket Structure I[data/] J[metadata/] end A -- 定义执行逻辑 --> F B -- DVC repro 执行 --> F C -- 作为参数输入 --> F F -- 写入/更新 --> G F -- 读写 Parquet 文件 --> H G -- 管理元数据 --> J H -- 存储数据文件 --> I F -- 成功后更新 --> D Git -- 版本化 --> A Git -- 版本化 --> B Git -- 版本化 --> C DVC -- 版本化 --> D style S3 Bucket Structure fill:#f9f,stroke:#333,stroke-width:2px
这个架构分为两个主要层面:
管道版本控制层 (DVC + Git):
-
Git
负责跟踪 ETL 的 Python 脚本 (main.py
)、DVC 管道定义 (dvc.yaml
) 和配置文件 (params.yaml
)。这是我们代码的“真相来源”。 -
DVC
负责编排管道。dvc.yaml
定义了阶段,声明了输入依赖(脚本和配置)和输出。但这里的关键区别是,我们不让 DVC 直接跟踪庞大的数据目录。相反,我们让它跟踪一个轻量级的输出文件,比如version.txt
。这个文件只包含成功执行后 Iceberg 表的最新快照 ID。这样,DVC 就能在代码或配置变更时触发管道重新运行,同时保持 DVC 本身的跟踪成本极低。
-
数据版本控制层 (Apache Iceberg):
-
Iceberg
在 S3 之上提供了一个抽象层。所有的 Parquet 数据文件和记录表状态的元数据文件都由 Iceberg 管理。 - 对表的所有更改(增、删、改)都是通过向元数据层提交一个新的“快照”来实现的。这个操作是原子的。如果作业在中途失败,事务会回滚,旧的快照仍然是表的当前状态,数据永远不会处于损坏状态。
- Iceberg 的时间旅行功能允许我们直接查询历史快照,这为数据回溯和调试提供了强大的能力。
-
环境搭建:一个可复现的本地实验场
在真实项目中,我们会使用 AWS S3 和 Glue Catalog。但为了本地开发和测试,使用 Docker Compose 搭建一个包含 MinIO (S3 兼容存储) 和 Iceberg REST Catalog 的环境是最高效的方式。
docker-compose.yml
version: '3.8'
services:
minio:
image: minio/minio:RELEASE.2023-09-07T02-05-02Z
container_name: minio
ports:
- "9000:9000" # API Port
- "9001:9001" # Console Port
volumes:
- minio_data:/data
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
- MINIO_DEFAULT_BUCKETS=warehouse
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
iceberg-catalog:
image: tabulario/iceberg-rest:0.6.0
container_name: iceberg_catalog
ports:
- "8181:8181"
environment:
- CATALOG_WAREHOUSE=s3a://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_S3_ACCESS__KEY__ID=minioadmin
- CATALOG_S3_SECRET__ACCESS__KEY=minioadmin
- CATALOG_S3_PATH__STYLE__ACCESS=true
depends_on:
minio:
condition: service_healthy
volumes:
minio_data:
这个配置启动了两个服务:一个 MinIO 实例用于对象存储,一个官方的 Iceberg REST Catalog 服务用于管理表元数据。
项目依赖则通过 pyproject.toml
管理。
pyproject.toml
[tool.poetry]
name = "dvc-iceberg-pipeline"
version = "0.1.0"
description = ""
authors = ["Your Name <[email protected]>"]
[tool.poetry.dependencies]
python = "^3.10"
dvc = {extras = ["s3"], version = "^3.27.0"}
pyiceberg = "^0.5.0"
pyarrow = "^13.0.0"
pandas = "^2.1.1"
python-dotenv = "^1.0.0"
Faker = "^19.6.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
初始化项目环境:poetry install
dvc init
接下来,配置 DVC 使用本地 MinIO 作为远程存储。这用于存储 DVC 跟踪的那个小小的 version.txt
文件。
# 配置 DVC 远程存储
dvc remote add -d minio s3://dvc-remote/
dvc remote modify minio endpointurl http://localhost:9000
dvc remote modify minio access_key_id minioadmin
dvc remote modify minio secret_access_key minioadmin
核心实现:事务性 ETL 脚本
现在,我们来编写核心的 Python ETL 脚本。这个脚本负责生成模拟数据,并将其以原子方式写入 Iceberg 表。
src/process_data.py
import os
import logging
import uuid
from datetime import datetime, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from faker import Faker
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField,
IntegerType,
StringType,
TimestampType,
UUIDType,
)
import yaml
# --- 配置日志 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Iceberg 表结构定义 ---
TABLE_SCHEMA = Schema(
NestedField(field_id=1, name="event_id", field_type=UUIDType(), is_required=True),
NestedField(field_id=2, name="user_id", field_type=IntegerType(), is_required=True),
NestedField(field_id=3, name="event_type", field_type=StringType(), is_required=False),
NestedField(field_id=4, name="event_timestamp", field_type=TimestampType(), is_required=True),
NestedField(field_id=5, name="payload", field_type=StringType(), is_required=False)
)
TABLE_NAME = "prod.events"
VERSION_FILE = "version.txt"
class DataProcessor:
"""
一个封装了数据生成和写入 Iceberg 表的事务性处理器。
"""
def __init__(self, catalog, params):
self.catalog = catalog
self.params = params
self.fake = Faker()
def _generate_fake_data(self, num_records: int) -> pa.Table:
"""生成一批模拟的用户事件数据"""
data = {
"event_id": [uuid.uuid4() for _ in range(num_records)],
"user_id": [self.fake.random_int(min=1000, max=5000) for _ in range(num_records)],
"event_type": [self.fake.random_element(elements=("login", "click", "purchase", "logout")) for _ in range(num_records)],
"event_timestamp": [datetime.utcnow() - timedelta(minutes=self.fake.random_int(min=1, max=60)) for _ in range(num_records)],
"payload": [self.fake.json(num_fields=3) for _ in range(num_records)]
}
df = pd.DataFrame(data)
# 确保时间戳是 timezone-naive, 与 Iceberg 的 TimestampType (without timezone) 匹配
df['event_timestamp'] = df['event_timestamp'].dt.tz_localize(None)
return pa.Table.from_pandas(df, schema=TABLE_SCHEMA.to_arrow())
def _ensure_table_exists(self):
"""如果表不存在,则创建它。这是幂等操作。"""
try:
table = self.catalog.load_table(TABLE_NAME)
logger.info(f"Table '{TABLE_NAME}' already exists. Current snapshot ID: {table.current_snapshot().snapshot_id}")
return table
except NoSuchTableError:
logger.info(f"Table '{TABLE_NAME}' not found. Creating it...")
return self.catalog.create_table(
identifier=TABLE_NAME,
schema=TABLE_SCHEMA,
# 在真实项目中,这里会定义分区策略
# partition_spec=PartitionSpec(PartitionField(source_id=4, name="event_day"))
)
def process_and_append(self):
"""
核心处理流程:生成数据,并以原子追加的方式写入 Iceberg 表。
"""
logger.info("Starting data processing and ingestion...")
table = self._ensure_table_exists()
num_records = self.params.get('data', {}).get('num_records', 100)
logger.info(f"Generating {num_records} fake records.")
arrow_table = self._generate_fake_data(num_records)
# 核心:事务性追加
# 如果这个代码块中发生任何异常,整个操作都会被回滚。
# 不会产生部分写入的数据文件。
try:
tx = table.new_transaction()
tx.append_data(arrow_table)
tx.commit_transaction()
# 刷新表状态以获取最新的快照
table.refresh()
new_snapshot_id = table.current_snapshot().snapshot_id
logger.info(f"Successfully committed transaction. New snapshot ID: {new_snapshot_id}")
# 将最新的快照ID写入 DVC 跟踪的文件
with open(VERSION_FILE, "w") as f:
f.write(str(new_snapshot_id))
logger.info(f"Updated '{VERSION_FILE}' with the new snapshot ID.")
except Exception as e:
logger.error(f"Transaction failed: {e}", exc_info=True)
# 失败时不需要手动回滚,事务会自动中止
raise
def main():
"""主执行函数"""
logger.info("Initializing Iceberg catalog...")
# 从环境变量或配置文件加载 S3/Catalog 配置
catalog_props = {
"uri": os.getenv("ICEBERG_CATALOG_URI", "http://localhost:8181"),
"s3.endpoint": os.getenv("AWS_ENDPOINT_URL", "http://localhost:9000"),
"s3.access-key-id": os.getenv("AWS_ACCESS_KEY_ID", "minioadmin"),
"s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY", "minioadmin"),
}
catalog = load_catalog("default", **catalog_props)
# 加载 DVC 参数
try:
with open("params.yaml", 'r') as f:
params = yaml.safe_load(f)
except FileNotFoundError:
logger.warning("params.yaml not found, using default parameters.")
params = {'data': {'num_records': 100}}
processor = DataProcessor(catalog, params)
processor.process_and_append()
if __name__ == "__main__":
main()
这个脚本有几个关键点:
- 配置驱动: 从
params.yaml
读取参数(比如要生成多少条记录),使得管道可以通过改变配置来调整行为。 - 幂等性:
_ensure_table_exists
确保无论脚本运行多少次,表只会被创建一次。 - 事务性:
table.new_transaction()
和tx.commit_transaction()
将所有文件写入操作包裹在一个原子单元中。这是避免数据损坏的核心机制。 - DVC 集成: 成功提交后,脚本将最新的 Iceberg 快照 ID 写入
version.txt
。这是连接 Iceberg 和 DVC 两个世界的桥梁。
编排管道:dvc.yaml
现在我们用 dvc.yaml
来定义这个 ETL 过程。
params.yaml
data:
num_records: 500
dvc.yaml
stages:
process-events:
cmd: python src/process_data.py
deps:
- src/process_data.py
params:
- data.num_records
outs:
- version.txt
这个 dvc.yaml
文件非常简洁但功能强大:
-
cmd
: 定义了要执行的命令。 -
deps
: 声明了此阶段的依赖项。如果src/process_data.py
的内容发生变化,DVC 会认为此阶段需要重新运行。 -
params
: 声明了对params.yaml
中特定参数的依赖。如果我们修改num_records
的值,DVC 同样会触发重新运行。 -
outs
: 声明了此阶段的输出。DVC 会跟踪version.txt
的变化。注意,我们没有把庞大的数据目录放在这里。
运行与验证
现在,让我们执行整个流程。
第一次运行:
$ dvc repro
Running stage 'process-events':
> python src/process_data.py
...
INFO:__main__:Initializing Iceberg catalog...
INFO:src.process_data:Table 'prod.events' not found. Creating it...
INFO:src.process_data:Generating 500 fake records.
INFO:src.process_data:Successfully committed transaction. New snapshot ID: 8182583861427537335
INFO:src.process_data:Updated 'version.txt' with the new snapshot ID.
...
执行成功后,version.txt
文件被创建并包含新的快照 ID。我们可以将这些变更提交到版本控制。
git add .
git commit -m "feat: initial pipeline run, ingest 500 records"
dvc push
我们可以通过 pyiceberg
或其他查询引擎(如 Spark, Trino, DuckDB)来验证数据。
模拟配置变更:
现在,我们修改 params.yaml
,增加生成记录的数量。
# params.yaml
data:
num_records: 1000 # 从 500 改为 1000
再次运行 dvc repro
:
$ dvc repro
Stage 'process-events' changed. It is safe to reproduce.
Running stage 'process-events':
> python src/process_data.py
...
INFO:__main__:Initializing Iceberg catalog...
INFO:src.process_data:Table 'prod.events' already exists. Current snapshot ID: 8182583861427537335
INFO:src.process_data:Generating 1000 fake records.
INFO:src.process_data:Successfully committed transaction. New snapshot ID: 4569331575088282361
INFO:src.process_data:Updated 'version.txt' with the new snapshot ID.
...
DVC 检测到 params.yaml
的变化,自动重新运行了作业。Iceberg 表被原子地追加了 1000 条新记录,version.txt
也被更新为新的快照 ID。整个过程是自动的、可预测的。
验证 Iceberg 的时间旅行:
Iceberg 的一个强大之处在于我们可以查询历史版本。
# 一个简单的查询脚本 query_history.py
import os
from pyiceberg.catalog import load_catalog
catalog_props = {
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
}
catalog = load_catalog("default", **catalog_props)
table = catalog.load_table("prod.events")
# 查询当前版本
current_df = table.scan().to_pandas()
print(f"Current version has {len(current_df)} rows. Snapshot ID: {table.current_snapshot().snapshot_id}")
# 查询历史版本
history = table.history()
if len(history) > 1:
previous_snapshot_id = history[-2].snapshot_id
previous_df = table.scan(snapshot_id=previous_snapshot_id).to_pandas()
print(f"Previous version had {len(previous_df)} rows. Snapshot ID: {previous_snapshot_id}")
运行这个脚本,我们会看到输出:Current version has 1500 rows. Snapshot ID: 4569331575088282361
Previous version had 500 rows. Snapshot ID: 8182583861427537335
这证明了 Iceberg 完整地保留了数据的历史状态,为审计、调试和复现旧的分析结果提供了坚实的基础。
局限性与未来展望
这套架构解决了我们最初的数据一致性和管道可复现性问题,但在真实生产环境中,还需要考虑一些额外的因素。
首先,当前的 Python ETL 脚本是单机执行的。当数据转换逻辑变得复杂或数据量巨大时,单机处理能力会成为瓶颈。一个自然的演进方向是将核心的 _generate_fake_data
(在现实中是数据转换)部分下推到分布式计算引擎中,例如使用 pyspark
。在这种模式下,Python 和 DVC 仍然扮演编排者的角色,负责启动 Spark 作业并管理其配置,而 Spark 则负责大规模的数据处理,最终将结果写入 Iceberg 表。
其次,我们使用的 Iceberg REST Catalog 是一个简单的实现。对于需要支持多团队协作、数据分支和合并等更高级工作流的场景,采用像 Project Nessie 这样的事务性 Catalog 会更有优势。Nessie 为数据湖带来了类似 Git 的能力,允许我们在数据上进行“分支”实验,验证后“合并”回主线,这与 DVC 的实验管理功能可以形成更深度的集成。
最后,这个框架目前专注于数据的追加(Append)操作。在实际应用中,处理更新(Update)和删除(Delete),即所谓的 MERGE INTO
操作,同样重要。Iceberg 支持这些操作,但需要在 Python 脚本中实现更复杂的逻辑来处理变更数据捕获 (CDC) 流或批量更新。这会是该框架下一步需要迭代完善的功能。