我们面临一个具体的工程挑战:前端用户需要通过Web界面发起一项计算密集型的数据分析任务。这项任务的后端逻辑由Python生态中的Dask库实现,因为它能轻易地将计算并行化到多个核心甚至多台机器。然而,我们的核心业务后端是基于C#和ASP.NET Core构建的,包含了所有认证、授权和业务流程控制。直接的HTTP请求-响应模型在这里完全不适用,因为计算任务可能持续数分钟甚至更久,我们不能让一个HTTP连接悬挂在那里。用户需要一个能实时反馈计算进度、展示中间结果并最终获取完整报告的动态界面。
初步的构想是规避在现有C#体系中引入复杂的队列系统或gRPC双向流,这会增加运维负担和技术栈的复杂度。我们决定采用一种更直接、耦合更松散但足够稳健的方案:C#后端通过进程管理的方式直接调用并监控一个独立的Python Dask计算脚本。两者之间通过标准的输入/输出流进行通信,C#将任务参数序列化为JSON写入Python进程的stdin
,而Python脚本则将进度、日志和结果以JSON Line的格式实时输出到stdout
。C#后端捕获这些输出,再通过SignalR将结构化的事件推送到前端。前端则使用MobX来管理这些异步、流式的状态更新,以一种声明式的方式驱动UI响应。
这个架构的核心在于定义了清晰的通信边界和责任划分。
graph TD subgraph Frontend [浏览器] A[React + MobX UI] end subgraph Backend [ASP.NET Core服务器] B[SignalR Hub] C[Dask Orchestration Service] D[Python Process Wrapper] end subgraph Computation [Python环境] E[Dask 计算脚本] F[Dask Cluster] end A -- "StartComputation(params)" --> B B -- "调用" --> C C -- "创建并启动进程" --> D D -- "stdin: 写入任务JSON" --> E E -- "启动" --> F F -- "执行并行计算" --> F E -- "stdout: 实时输出JSON Line" --> D D -- "异步读取stdout" --> C C -- "解析事件并广播" --> B B -- "PushProgress(update)" --> A style Frontend fill:#dff,stroke:#333,stroke-width:2px style Backend fill:#fdf,stroke:#333,stroke-width:2px style Computation fill:#dfd,stroke:#333,stroke-width:2px
第一部分:Python Dask计算脚本
这是整个流程的执行引擎。脚本必须设计为从stdin
读取一次性配置,然后持续地向stdout
输出结构化信息直到任务结束。这里的关键是不能使用常规的print
,而是要确保每个输出都是一个独立的、完整的JSON对象,并以换行符分隔(JSON Line格式)。这极大地简化了C#端的解析逻辑。
我们模拟一个需要多步完成的蒙特卡洛方法计算圆周率的任务,每一步都作为一个Dask任务提交,从而可以精确地报告进度。
dask_worker.py
:
import sys
import json
import time
import random
import dask
from dask.distributed import Client, LocalCluster
# --- 日志和状态输出的辅助函数 ---
def write_output(data_type: str, payload: dict):
"""将输出封装为标准JSON Line格式并写入stdout。"""
# 确保立即刷新缓冲区,以便C#端可以实时读取
print(json.dumps({"type": data_type, "payload": payload}), flush=True)
def calculate_pi_step(seed):
"""单个计算步骤:在单位正方形内生成一个点并判断是否在圆内。"""
random.seed(seed)
x, y = random.random(), random.random()
return 1 if x**2 + y**2 <= 1 else 0
def main():
try:
# --- 1. 从stdin读取任务配置 ---
config_line = sys.stdin.readline()
if not config_line:
write_output("error", {"message": "Configuration not provided via stdin."})
return
config = json.loads(config_line)
total_samples = config.get("total_samples", 1_000_000)
batch_size = config.get("batch_size", 10_000)
write_output("log", {"level": "info", "message": f"Worker started with {total_samples} samples in batches of {batch_size}."})
# --- 2. 初始化Dask集群 ---
# 在真实项目中,这里会连接到一个远程的、已存在的Dask集群
# 为了示例简单,我们创建一个本地集群
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)
write_output("log", {"level": "info", "message": f"Dask client connected. Dashboard at: {client.dashboard_link}"})
# --- 3. 提交任务并监控进度 ---
num_batches = total_samples // batch_size
total_in_circle = 0
completed_batches = 0
# 将任务分批提交
for i in range(num_batches):
batch_futures = [client.submit(calculate_pi_step, seed=time.time_ns() + j) for j in range(batch_size)]
# 异步等待这批任务完成
# as_completed 提供了更好的实时性
for future in dask.distributed.as_completed(batch_futures):
total_in_circle += future.result()
completed_batches += 1
progress = completed_batches / num_batches
# 每完成一批,就输出一次进度
write_output("progress", {
"completed_batches": completed_batches,
"total_batches": num_batches,
"progress_percentage": round(progress * 100, 2)
})
# 模拟IO延迟或更复杂的计算
time.sleep(0.1)
# --- 4. 计算并输出最终结果 ---
pi_estimate = 4 * total_in_circle / total_samples
write_output("result", {"pi_estimate": pi_estimate, "total_samples": total_samples})
client.close()
cluster.close()
except json.JSONDecodeError as e:
write_output("error", {"message": f"Failed to decode configuration JSON: {str(e)}"})
except Exception as e:
write_output("error", {"message": f"An unexpected error occurred: {str(e)}"})
finally:
write_output("status", {"state": "finished"})
if __name__ == "__main__":
main()
这个脚本是完全自包含的。它的健壮性体现在:
- 结构化输出:
write_output
函数确保了通信协议的一致性。 - 错误处理:
try...except
块能捕获执行期间的几乎所有错误,并将它们以同样结构化的方式报告给父进程。 - 资源管理:
finally
块和Dask客户端的close()
方法确保资源被正确释放。 - 实时反馈: 通过在循环中报告进度,而不是等待所有任务完成后一次性报告,我们实现了流式更新。
第二部分:C# ASP.NET Core后端编排服务
后端是整个系统的中枢。它负责接收前端请求、管理Python子进程的生命周期、解析其输出流,并通过SignalR将信息转发出去。
首先,我们定义SignalR Hub。它非常简单,只暴露一个启动计算的方法。客户端的通信是单向的,服务器主动推送更新。
ComputationHub.cs
:
using Microsoft.AspNetCore.SignalR;
using System.Threading.Tasks;
namespace RealtimeDask.Hubs
{
// 定义推送给客户端的接口,便于强类型调用
public interface IComputationClient
{
Task ReceiveUpdate(string type, string payload);
Task ComputationStarted(string computationId);
Task ComputationFinished(string computationId);
}
public class ComputationHub : Hub<IComputationClient>
{
private readonly DaskOrchestratorService _orchestrator;
public ComputationHub(DaskOrchestratorService orchestrator)
{
_orchestrator = orchestrator;
}
public async Task StartComputation(long totalSamples, int batchSize)
{
// 使用ConnectionId作为计算任务的唯一标识符
// 在真实项目中,可能需要更复杂的会话管理
var computationId = Context.ConnectionId;
// 通知客户端任务已开始
await Clients.Caller.ComputationStarted(computationId);
var parameters = new { total_samples = totalSamples, batch_size = batchSize };
// 将执行权交给编排服务,Hub本身不应处理长时任务
_ = _orchestrator.RunComputation(computationId, parameters);
}
public override Task OnDisconnectedAsync(System.Exception? exception)
{
// 当客户端断开连接时,我们应该终止与之关联的计算任务
_orchestrator.CancelComputation(Context.ConnectionId);
return base.OnDisconnectedAsync(exception);
}
}
}
真正的核心是DaskOrchestratorService
。它使用System.Diagnostics.Process
来管理Python进程。这里的关键在于以非阻塞的方式异步读取stdout
和stderr
。
DaskOrchestratorService.cs
:
using Microsoft.AspNetCore.SignalR;
using RealtimeDask.Hubs;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text.Json;
namespace RealtimeDask.Services
{
public class DaskOrchestratorService
{
private readonly IHubContext<ComputationHub, IComputationClient> _hubContext;
private readonly ILogger<DaskOrchestratorService> _logger;
// 存储正在运行的计算任务,键为computationId,值为Process对象
private readonly ConcurrentDictionary<string, Process> _runningComputations = new();
public DaskOrchestratorService(IHubContext<ComputationHub, IComputationClient> hubContext, ILogger<DaskOrchestratorService> logger)
{
_hubContext = hubContext;
_logger = logger;
}
public async Task RunComputation(string computationId, object parameters)
{
var process = new Process
{
StartInfo = new ProcessStartInfo
{
FileName = "python", // 或者python3, 取决于环境
Arguments = "dask_worker.py",
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
// 确保工作目录正确,以便脚本能被找到
WorkingDirectory = Path.Combine(Directory.GetCurrentDirectory(), "PythonWorker")
},
EnableRaisingEvents = true
};
// 注册进程退出事件处理器
process.Exited += (sender, args) => ProcessExited(computationId, sender as Process);
if (!_runningComputations.TryAdd(computationId, process))
{
_logger.LogWarning("Computation with ID {ComputationId} already exists.", computationId);
return;
}
try
{
process.Start();
// 异步处理标准输出和标准错误流
var outputTask = ProcessStream(computationId, process.StandardOutput);
var errorTask = ProcessStream(computationId, process.StandardError, isError: true);
// 将参数写入Python脚本的stdin
await using (var sw = process.StandardInput)
{
if (sw.BaseStream.CanWrite)
{
var jsonParams = JsonSerializer.Serialize(parameters);
await sw.WriteLineAsync(jsonParams);
}
}
// 等待流处理完成
await Task.WhenAll(outputTask, errorTask);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to start or monitor process for computation {ComputationId}", computationId);
await SendUpdateToClient(computationId, "error", new { message = $"Orchestrator Error: {ex.Message}" });
}
finally
{
// 任务自然结束或异常后,通知客户端
await _hubContext.Clients.Client(computationId).ComputationFinished(computationId);
CleanupComputation(computationId);
}
}
private async Task ProcessStream(string computationId, StreamReader reader, bool isError = false)
{
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(line)) continue;
if (isError)
{
_logger.LogError("Python script stderr for {ComputationId}: {ErrorLine}", computationId, line);
await SendUpdateToClient(computationId, "error", new { message = $"Script Error: {line}" });
continue;
}
try
{
// 解析来自Python脚本的JSON Line
using var jsonDoc = JsonDocument.Parse(line);
var root = jsonDoc.RootElement;
var type = root.GetProperty("type").GetString() ?? "unknown";
var payload = root.GetProperty("payload").GetRawText();
await _hubContext.Clients.Client(computationId).ReceiveUpdate(type, payload);
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to parse JSON from python script for {ComputationId}. Line: {Line}", computationId, line);
// 即使无法解析,也把原始日志发给前端,便于调试
await SendUpdateToClient(computationId, "log", new { level = "warn", message = $"[RAW] {line}" });
}
}
}
private async Task SendUpdateToClient(string computationId, string type, object payload)
{
var payloadJson = JsonSerializer.Serialize(payload);
await _hubContext.Clients.Client(computationId).ReceiveUpdate(type, payloadJson);
}
public void CancelComputation(string computationId)
{
if (_runningComputations.TryGetValue(computationId, out var process))
{
try
{
if (!process.HasExited)
{
_logger.LogInformation("Killing process for computation {ComputationId}", computationId);
process.Kill(entireProcessTree: true); // 确保Dask子进程也被清理
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to kill process for computation {ComputationId}", computationId);
}
finally
{
CleanupComputation(computationId);
}
}
}
private void ProcessExited(string computationId, Process? process)
{
_logger.LogInformation("Process for computation {ComputationId} exited with code {ExitCode}", computationId, process?.ExitCode ?? -1);
CleanupComputation(computationId);
}
private void CleanupComputation(string computationId)
{
if (_runningComputations.TryRemove(computationId, out var process))
{
process.Dispose();
}
}
}
}
这个服务的设计考虑了生产环境的几个要点:
- 并发安全: 使用
ConcurrentDictionary
来管理多个并发的计算任务。 - 资源清理:
OnDisconnectedAsync
回调和CancelComputation
方法确保当用户关闭页面或主动取消时,能终止对应的Python进程,防止僵尸进程。process.Kill(true)
对于杀死由主脚本启动的Dask worker子进程至关重要。 - 日志记录: 详尽的日志有助于在生产环境中追踪问题。
- 解耦: SignalR Hub的角色被最小化,只负责与客户端的通信协议,而将所有业务逻辑和进程管理封装在
DaskOrchestratorService
中,符合单一职责原则。
第三部分:前端React与MobX状态管理
前端的核心是创建一个MobX Store来封装所有与SignalR的交互和状态管理。UI组件只需响应式地渲染Store中的数据即可。
ComputationStore.ts
:
import { makeAutoObservable, runInAction } from "mobx";
import * as signalR from "@microsoft/signalr";
type ComputationStatus = "idle" | "starting" | "running" | "finished" | "error";
export interface LogEntry {
level: "info" | "warn" | "error";
message: string;
timestamp: Date;
}
export interface ProgressUpdate {
completed_batches: number;
total_batches: number;
progress_percentage: number;
}
class ComputationStore {
// --- Observables: 状态会自动触发UI更新 ---
status: ComputationStatus = "idle";
progress: ProgressUpdate | null = null;
result: { pi_estimate: number } | null = null;
logs: LogEntry[] = [];
error: string | null = null;
computationId: string | null = null;
private connection: signalR.HubConnection | null = null;
constructor() {
makeAutoObservable(this, {}, { autoBind: true });
this.setupConnection();
}
// --- Actions: 修改状态的唯一入口 ---
private setupConnection() {
this.connection = new signalR.HubConnectionBuilder()
.withUrl("/computationHub")
.withAutomaticReconnect()
.build();
this.connection.on("ComputationStarted", (id: string) => {
runInAction(() => {
this.status = "running";
this.computationId = id;
this.logs.push({ level: "info", message: `Computation started with ID: ${id}`, timestamp: new Date() });
});
});
this.connection.on("ComputationFinished", (id: string) => {
runInAction(() => {
if (this.status !== "error") {
this.status = "finished";
}
this.logs.push({ level: "info", message: `Computation finished for ID: ${id}`, timestamp: new Date() });
});
});
this.connection.on("ReceiveUpdate", (type: string, payload: string) => {
const data = JSON.parse(payload);
runInAction(() => {
switch (type) {
case "progress":
this.progress = data as ProgressUpdate;
break;
case "result":
this.result = data;
break;
case "log":
this.logs.push({ ...data, timestamp: new Date() });
break;
case "error":
this.status = "error";
this.error = data.message;
this.logs.push({ level: "error", message: data.message, timestamp: new Date() });
break;
case "status":
if (data.state === "finished" && this.status !== "error") {
this.status = "finished";
}
break;
}
});
});
this.connection.start().catch(err => {
console.error("SignalR Connection Error: ", err);
runInAction(() => this.error = "Failed to connect to the server.");
});
}
async startComputation(totalSamples: number, batchSize: number) {
if (this.status === "running" || this.status === "starting") return;
// Reset state for a new run
this.reset();
this.status = "starting";
try {
await this.connection?.invoke("StartComputation", totalSamples, batchSize);
} catch (err) {
console.error("Failed to start computation: ", err);
runInAction(() => {
this.status = "error";
this.error = (err as Error).toString();
});
}
}
reset() {
this.status = "idle";
this.progress = null;
this.result = null;
this.logs = [];
this.error = null;
this.computationId = null;
}
}
export const computationStore = new ComputationStore();
这个Store的设计体现了MobX的精髓:
- 封装复杂性: 所有SignalR的连接、事件监听和消息解析逻辑都封装在Store内部。UI组件无需关心这些细节。
- 声明式状态:
status
,progress
,result
等都是observable
。当SignalR回调通过runInAction
更新它们时,任何观察这些状态的UI组件都会自动、高效地重新渲染。 - 清晰的边界: Actions (
startComputation
,reset
)是外部世界与Store状态交互的唯一途径,保证了状态变更的可预测性。
方案的局限性与未来迭代方向
尽管此方案在原型和中小型项目中表现良好,但它的核心弱点在于C#后端与Python进程的紧密耦合。Process.Start
意味着Python环境必须与C#应用服务器部署在同一台机器上,这限制了计算资源和应用服务器的独立扩展。
一个更具弹性的生产级架构会用一个真正的消息队列(如RabbitMQ或Redis Streams)来解耦。C#后端将计算任务作为消息发布到队列中。一个或多个独立的Python Worker服务(可以部署在任意数量的机器上,甚至在Kubernetes集群中)订阅这个队列,消费任务,并通过一个独立的通道(例如,直接写入一个共享的数据库/缓存,或通过另一个队列)报告其进度和结果。C#后端再从这个通道读取状态,推送给前端。这种方式虽然增加了架构的复杂性,但换来了水平扩展能力、更好的容错性和更清晰的服务边界。
此外,当前的错误处理虽然能捕获进程崩溃,但缺乏重试机制。对于长时间运行的关键任务,需要引入持久化的任务状态管理和自动重试策略。安全方面,直接执行外部脚本需要严格的输入验证和权限控制,以防止潜在的命令注入攻击。