构建连接 C# SignalR, Python Dask 与 MobX 的实时分布式计算监控面板


我们面临一个具体的工程挑战:前端用户需要通过Web界面发起一项计算密集型的数据分析任务。这项任务的后端逻辑由Python生态中的Dask库实现,因为它能轻易地将计算并行化到多个核心甚至多台机器。然而,我们的核心业务后端是基于C#和ASP.NET Core构建的,包含了所有认证、授权和业务流程控制。直接的HTTP请求-响应模型在这里完全不适用,因为计算任务可能持续数分钟甚至更久,我们不能让一个HTTP连接悬挂在那里。用户需要一个能实时反馈计算进度、展示中间结果并最终获取完整报告的动态界面。

初步的构想是规避在现有C#体系中引入复杂的队列系统或gRPC双向流,这会增加运维负担和技术栈的复杂度。我们决定采用一种更直接、耦合更松散但足够稳健的方案:C#后端通过进程管理的方式直接调用并监控一个独立的Python Dask计算脚本。两者之间通过标准的输入/输出流进行通信,C#将任务参数序列化为JSON写入Python进程的stdin,而Python脚本则将进度、日志和结果以JSON Line的格式实时输出到stdout。C#后端捕获这些输出,再通过SignalR将结构化的事件推送到前端。前端则使用MobX来管理这些异步、流式的状态更新,以一种声明式的方式驱动UI响应。

这个架构的核心在于定义了清晰的通信边界和责任划分。

graph TD
    subgraph Frontend [浏览器]
        A[React + MobX UI]
    end

    subgraph Backend [ASP.NET Core服务器]
        B[SignalR Hub]
        C[Dask Orchestration Service]
        D[Python Process Wrapper]
    end

    subgraph Computation [Python环境]
        E[Dask 计算脚本]
        F[Dask Cluster]
    end

    A -- "StartComputation(params)" --> B
    B -- "调用" --> C
    C -- "创建并启动进程" --> D
    D -- "stdin: 写入任务JSON" --> E
    E -- "启动" --> F
    F -- "执行并行计算" --> F
    E -- "stdout: 实时输出JSON Line" --> D
    D -- "异步读取stdout" --> C
    C -- "解析事件并广播" --> B
    B -- "PushProgress(update)" --> A

    style Frontend fill:#dff,stroke:#333,stroke-width:2px
    style Backend fill:#fdf,stroke:#333,stroke-width:2px
    style Computation fill:#dfd,stroke:#333,stroke-width:2px

第一部分:Python Dask计算脚本

这是整个流程的执行引擎。脚本必须设计为从stdin读取一次性配置,然后持续地向stdout输出结构化信息直到任务结束。这里的关键是不能使用常规的print,而是要确保每个输出都是一个独立的、完整的JSON对象,并以换行符分隔(JSON Line格式)。这极大地简化了C#端的解析逻辑。

我们模拟一个需要多步完成的蒙特卡洛方法计算圆周率的任务,每一步都作为一个Dask任务提交,从而可以精确地报告进度。

dask_worker.py:

import sys
import json
import time
import random
import dask
from dask.distributed import Client, LocalCluster

# --- 日志和状态输出的辅助函数 ---
def write_output(data_type: str, payload: dict):
    """将输出封装为标准JSON Line格式并写入stdout。"""
    # 确保立即刷新缓冲区,以便C#端可以实时读取
    print(json.dumps({"type": data_type, "payload": payload}), flush=True)

def calculate_pi_step(seed):
    """单个计算步骤:在单位正方形内生成一个点并判断是否在圆内。"""
    random.seed(seed)
    x, y = random.random(), random.random()
    return 1 if x**2 + y**2 <= 1 else 0

def main():
    try:
        # --- 1. 从stdin读取任务配置 ---
        config_line = sys.stdin.readline()
        if not config_line:
            write_output("error", {"message": "Configuration not provided via stdin."})
            return

        config = json.loads(config_line)
        total_samples = config.get("total_samples", 1_000_000)
        batch_size = config.get("batch_size", 10_000)
        
        write_output("log", {"level": "info", "message": f"Worker started with {total_samples} samples in batches of {batch_size}."})

        # --- 2. 初始化Dask集群 ---
        # 在真实项目中,这里会连接到一个远程的、已存在的Dask集群
        # 为了示例简单,我们创建一个本地集群
        cluster = LocalCluster(n_workers=4, threads_per_worker=1)
        client = Client(cluster)

        write_output("log", {"level": "info", "message": f"Dask client connected. Dashboard at: {client.dashboard_link}"})

        # --- 3. 提交任务并监控进度 ---
        num_batches = total_samples // batch_size
        total_in_circle = 0
        completed_batches = 0

        # 将任务分批提交
        for i in range(num_batches):
            batch_futures = [client.submit(calculate_pi_step, seed=time.time_ns() + j) for j in range(batch_size)]
            
            # 异步等待这批任务完成
            # as_completed 提供了更好的实时性
            for future in dask.distributed.as_completed(batch_futures):
                total_in_circle += future.result()

            completed_batches += 1
            progress = completed_batches / num_batches
            
            # 每完成一批,就输出一次进度
            write_output("progress", {
                "completed_batches": completed_batches,
                "total_batches": num_batches,
                "progress_percentage": round(progress * 100, 2)
            })
            # 模拟IO延迟或更复杂的计算
            time.sleep(0.1)

        # --- 4. 计算并输出最终结果 ---
        pi_estimate = 4 * total_in_circle / total_samples
        write_output("result", {"pi_estimate": pi_estimate, "total_samples": total_samples})

        client.close()
        cluster.close()

    except json.JSONDecodeError as e:
        write_output("error", {"message": f"Failed to decode configuration JSON: {str(e)}"})
    except Exception as e:
        write_output("error", {"message": f"An unexpected error occurred: {str(e)}"})
    finally:
        write_output("status", {"state": "finished"})


if __name__ == "__main__":
    main()

这个脚本是完全自包含的。它的健壮性体现在:

  1. 结构化输出: write_output函数确保了通信协议的一致性。
  2. 错误处理: try...except块能捕获执行期间的几乎所有错误,并将它们以同样结构化的方式报告给父进程。
  3. 资源管理: finally块和Dask客户端的close()方法确保资源被正确释放。
  4. 实时反馈: 通过在循环中报告进度,而不是等待所有任务完成后一次性报告,我们实现了流式更新。

第二部分:C# ASP.NET Core后端编排服务

后端是整个系统的中枢。它负责接收前端请求、管理Python子进程的生命周期、解析其输出流,并通过SignalR将信息转发出去。

首先,我们定义SignalR Hub。它非常简单,只暴露一个启动计算的方法。客户端的通信是单向的,服务器主动推送更新。

ComputationHub.cs:

using Microsoft.AspNetCore.SignalR;
using System.Threading.Tasks;

namespace RealtimeDask.Hubs
{
    // 定义推送给客户端的接口,便于强类型调用
    public interface IComputationClient
    {
        Task ReceiveUpdate(string type, string payload);
        Task ComputationStarted(string computationId);
        Task ComputationFinished(string computationId);
    }

    public class ComputationHub : Hub<IComputationClient>
    {
        private readonly DaskOrchestratorService _orchestrator;

        public ComputationHub(DaskOrchestratorService orchestrator)
        {
            _orchestrator = orchestrator;
        }

        public async Task StartComputation(long totalSamples, int batchSize)
        {
            // 使用ConnectionId作为计算任务的唯一标识符
            // 在真实项目中,可能需要更复杂的会话管理
            var computationId = Context.ConnectionId;
            
            // 通知客户端任务已开始
            await Clients.Caller.ComputationStarted(computationId);

            var parameters = new { total_samples = totalSamples, batch_size = batchSize };
            
            // 将执行权交给编排服务,Hub本身不应处理长时任务
            _ = _orchestrator.RunComputation(computationId, parameters);
        }

        public override Task OnDisconnectedAsync(System.Exception? exception)
        {
            // 当客户端断开连接时,我们应该终止与之关联的计算任务
            _orchestrator.CancelComputation(Context.ConnectionId);
            return base.OnDisconnectedAsync(exception);
        }
    }
}

真正的核心是DaskOrchestratorService。它使用System.Diagnostics.Process来管理Python进程。这里的关键在于以非阻塞的方式异步读取stdoutstderr

DaskOrchestratorService.cs:

using Microsoft.AspNetCore.SignalR;
using RealtimeDask.Hubs;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text.Json;

namespace RealtimeDask.Services
{
    public class DaskOrchestratorService
    {
        private readonly IHubContext<ComputationHub, IComputationClient> _hubContext;
        private readonly ILogger<DaskOrchestratorService> _logger;
        // 存储正在运行的计算任务,键为computationId,值为Process对象
        private readonly ConcurrentDictionary<string, Process> _runningComputations = new();

        public DaskOrchestratorService(IHubContext<ComputationHub, IComputationClient> hubContext, ILogger<DaskOrchestratorService> logger)
        {
            _hubContext = hubContext;
            _logger = logger;
        }

        public async Task RunComputation(string computationId, object parameters)
        {
            var process = new Process
            {
                StartInfo = new ProcessStartInfo
                {
                    FileName = "python", // 或者python3, 取决于环境
                    Arguments = "dask_worker.py",
                    RedirectStandardInput = true,
                    RedirectStandardOutput = true,
                    RedirectStandardError = true,
                    UseShellExecute = false,
                    CreateNoWindow = true,
                    // 确保工作目录正确,以便脚本能被找到
                    WorkingDirectory = Path.Combine(Directory.GetCurrentDirectory(), "PythonWorker")
                },
                EnableRaisingEvents = true
            };

            // 注册进程退出事件处理器
            process.Exited += (sender, args) => ProcessExited(computationId, sender as Process);

            if (!_runningComputations.TryAdd(computationId, process))
            {
                _logger.LogWarning("Computation with ID {ComputationId} already exists.", computationId);
                return;
            }

            try
            {
                process.Start();

                // 异步处理标准输出和标准错误流
                var outputTask = ProcessStream(computationId, process.StandardOutput);
                var errorTask = ProcessStream(computationId, process.StandardError, isError: true);

                // 将参数写入Python脚本的stdin
                await using (var sw = process.StandardInput)
                {
                    if (sw.BaseStream.CanWrite)
                    {
                        var jsonParams = JsonSerializer.Serialize(parameters);
                        await sw.WriteLineAsync(jsonParams);
                    }
                }
                
                // 等待流处理完成
                await Task.WhenAll(outputTask, errorTask);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to start or monitor process for computation {ComputationId}", computationId);
                await SendUpdateToClient(computationId, "error", new { message = $"Orchestrator Error: {ex.Message}" });
            }
            finally
            {
                // 任务自然结束或异常后,通知客户端
                await _hubContext.Clients.Client(computationId).ComputationFinished(computationId);
                CleanupComputation(computationId);
            }
        }
        
        private async Task ProcessStream(string computationId, StreamReader reader, bool isError = false)
        {
            while (!reader.EndOfStream)
            {
                var line = await reader.ReadLineAsync();
                if (string.IsNullOrEmpty(line)) continue;

                if (isError)
                {
                    _logger.LogError("Python script stderr for {ComputationId}: {ErrorLine}", computationId, line);
                    await SendUpdateToClient(computationId, "error", new { message = $"Script Error: {line}" });
                    continue;
                }

                try
                {
                    // 解析来自Python脚本的JSON Line
                    using var jsonDoc = JsonDocument.Parse(line);
                    var root = jsonDoc.RootElement;
                    var type = root.GetProperty("type").GetString() ?? "unknown";
                    var payload = root.GetProperty("payload").GetRawText();
                    
                    await _hubContext.Clients.Client(computationId).ReceiveUpdate(type, payload);
                }
                catch (JsonException ex)
                {
                    _logger.LogWarning(ex, "Failed to parse JSON from python script for {ComputationId}. Line: {Line}", computationId, line);
                    // 即使无法解析,也把原始日志发给前端,便于调试
                    await SendUpdateToClient(computationId, "log", new { level = "warn", message = $"[RAW] {line}" });
                }
            }
        }
        
        private async Task SendUpdateToClient(string computationId, string type, object payload)
        {
            var payloadJson = JsonSerializer.Serialize(payload);
            await _hubContext.Clients.Client(computationId).ReceiveUpdate(type, payloadJson);
        }

        public void CancelComputation(string computationId)
        {
            if (_runningComputations.TryGetValue(computationId, out var process))
            {
                try
                {
                    if (!process.HasExited)
                    {
                        _logger.LogInformation("Killing process for computation {ComputationId}", computationId);
                        process.Kill(entireProcessTree: true); // 确保Dask子进程也被清理
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to kill process for computation {ComputationId}", computationId);
                }
                finally
                {
                    CleanupComputation(computationId);
                }
            }
        }

        private void ProcessExited(string computationId, Process? process)
        {
            _logger.LogInformation("Process for computation {ComputationId} exited with code {ExitCode}", computationId, process?.ExitCode ?? -1);
            CleanupComputation(computationId);
        }

        private void CleanupComputation(string computationId)
        {
            if (_runningComputations.TryRemove(computationId, out var process))
            {
                process.Dispose();
            }
        }
    }
}

这个服务的设计考虑了生产环境的几个要点:

  1. 并发安全: 使用ConcurrentDictionary来管理多个并发的计算任务。
  2. 资源清理: OnDisconnectedAsync回调和CancelComputation方法确保当用户关闭页面或主动取消时,能终止对应的Python进程,防止僵尸进程。process.Kill(true)对于杀死由主脚本启动的Dask worker子进程至关重要。
  3. 日志记录: 详尽的日志有助于在生产环境中追踪问题。
  4. 解耦: SignalR Hub的角色被最小化,只负责与客户端的通信协议,而将所有业务逻辑和进程管理封装在DaskOrchestratorService中,符合单一职责原则。

第三部分:前端React与MobX状态管理

前端的核心是创建一个MobX Store来封装所有与SignalR的交互和状态管理。UI组件只需响应式地渲染Store中的数据即可。

ComputationStore.ts:

import { makeAutoObservable, runInAction } from "mobx";
import * as signalR from "@microsoft/signalr";

type ComputationStatus = "idle" | "starting" | "running" | "finished" | "error";

export interface LogEntry {
    level: "info" | "warn" | "error";
    message: string;
    timestamp: Date;
}

export interface ProgressUpdate {
    completed_batches: number;
    total_batches: number;

    progress_percentage: number;
}

class ComputationStore {
    // --- Observables: 状态会自动触发UI更新 ---
    status: ComputationStatus = "idle";
    progress: ProgressUpdate | null = null;
    result: { pi_estimate: number } | null = null;
    logs: LogEntry[] = [];
    error: string | null = null;
    computationId: string | null = null;
    
    private connection: signalR.HubConnection | null = null;

    constructor() {
        makeAutoObservable(this, {}, { autoBind: true });
        this.setupConnection();
    }

    // --- Actions: 修改状态的唯一入口 ---
    private setupConnection() {
        this.connection = new signalR.HubConnectionBuilder()
            .withUrl("/computationHub")
            .withAutomaticReconnect()
            .build();

        this.connection.on("ComputationStarted", (id: string) => {
            runInAction(() => {
                this.status = "running";
                this.computationId = id;
                this.logs.push({ level: "info", message: `Computation started with ID: ${id}`, timestamp: new Date() });
            });
        });

        this.connection.on("ComputationFinished", (id: string) => {
            runInAction(() => {
                if (this.status !== "error") {
                    this.status = "finished";
                }
                this.logs.push({ level: "info", message: `Computation finished for ID: ${id}`, timestamp: new Date() });
            });
        });

        this.connection.on("ReceiveUpdate", (type: string, payload: string) => {
            const data = JSON.parse(payload);
            runInAction(() => {
                switch (type) {
                    case "progress":
                        this.progress = data as ProgressUpdate;
                        break;
                    case "result":
                        this.result = data;
                        break;
                    case "log":
                        this.logs.push({ ...data, timestamp: new Date() });
                        break;
                    case "error":
                        this.status = "error";
                        this.error = data.message;
                        this.logs.push({ level: "error", message: data.message, timestamp: new Date() });
                        break;
                    case "status":
                        if (data.state === "finished" && this.status !== "error") {
                            this.status = "finished";
                        }
                        break;
                }
            });
        });
        
        this.connection.start().catch(err => {
            console.error("SignalR Connection Error: ", err);
            runInAction(() => this.error = "Failed to connect to the server.");
        });
    }

    async startComputation(totalSamples: number, batchSize: number) {
        if (this.status === "running" || this.status === "starting") return;

        // Reset state for a new run
        this.reset();
        this.status = "starting";

        try {
            await this.connection?.invoke("StartComputation", totalSamples, batchSize);
        } catch (err) {
            console.error("Failed to start computation: ", err);
            runInAction(() => {
                this.status = "error";
                this.error = (err as Error).toString();
            });
        }
    }
    
    reset() {
        this.status = "idle";
        this.progress = null;
        this.result = null;
        this.logs = [];
        this.error = null;
        this.computationId = null;
    }
}

export const computationStore = new ComputationStore();

这个Store的设计体现了MobX的精髓:

  1. 封装复杂性: 所有SignalR的连接、事件监听和消息解析逻辑都封装在Store内部。UI组件无需关心这些细节。
  2. 声明式状态: status, progress, result等都是observable。当SignalR回调通过runInAction更新它们时,任何观察这些状态的UI组件都会自动、高效地重新渲染。
  3. 清晰的边界: Actions (startComputation, reset)是外部世界与Store状态交互的唯一途径,保证了状态变更的可预测性。

方案的局限性与未来迭代方向

尽管此方案在原型和中小型项目中表现良好,但它的核心弱点在于C#后端与Python进程的紧密耦合。Process.Start意味着Python环境必须与C#应用服务器部署在同一台机器上,这限制了计算资源和应用服务器的独立扩展。

一个更具弹性的生产级架构会用一个真正的消息队列(如RabbitMQ或Redis Streams)来解耦。C#后端将计算任务作为消息发布到队列中。一个或多个独立的Python Worker服务(可以部署在任意数量的机器上,甚至在Kubernetes集群中)订阅这个队列,消费任务,并通过一个独立的通道(例如,直接写入一个共享的数据库/缓存,或通过另一个队列)报告其进度和结果。C#后端再从这个通道读取状态,推送给前端。这种方式虽然增加了架构的复杂性,但换来了水平扩展能力、更好的容错性和更清晰的服务边界。

此外,当前的错误处理虽然能捕获进程崩溃,但缺乏重试机制。对于长时间运行的关键任务,需要引入持久化的任务状态管理和自动重试策略。安全方面,直接执行外部脚本需要严格的输入验证和权限控制,以防止潜在的命令注入攻击。


  目录