浅析 .NET 响应式编程 IObservable与ReactiveX

一、编程范式全景解读

1. 四大编程范式核心对比

范式 控制方式 数据流动 典型代表 思维模式案例
命令式编程 明确的步骤指令 可变状态驱动 C, Java循环结构 烹饪食谱:按步骤操作
声明式编程 描述目标结果 自动推导执行路径 SQL, LINQ, XAML 餐厅点餐:告知需求而非做法
函数式编程 数学函数组合 不可变数据管道 Haskell, F#, LINQ 流水线加工:输入→转换→输出
响应式编程 事件流驱动 异步数据传播 RxJS, ReactiveX 快递追踪:订阅状态变化通知

2. 范式演进趋势

graph LR
    A[命令式] --> B[声明式]
    B --> C[函数式]
    C --> D[响应式]
// 命令式 - 过程式操作
List<int> list = new List<int>();
for (int i = 0; i < 10; i++)
{
    if (i % 2 == 0) list.Add(i * 2);
}

// 声明式 - 描述结果
IEnumerable<int> result = Enumerable.Range(0, 10)
    .Where(x => x % 2 == 0)
    .Select(x => x * 2);

// 函数式 - 无副作用
int Transform(int x) => x * 2;
int final = Transform(Transform(5)); // 组合调用

// 响应式 - 事件流处理
Observable.Interval(TimeSpan.FromSeconds(1))
    .Subscribe(t => Console.WriteLine(t));

二、C#响应式编程核心:IObservable接口

1. 观察者模式双接口

// 生产者接口
public interface IObservable<T> 
{
    IDisposable Subscribe(IObserver<T> observer);
}

// 消费者接口
public interface IObserver<T> 
{
    void OnNext(T value);    // 数据推送
    void OnError(Exception error);  // 错误处理
    void OnCompleted();      // 流终止
}

2. 完整实现案例:温度监控系统

public class TemperatureSensor : IObservable<double>
{
    private List<IObserver<double>> _observers = new();

    public IDisposable Subscribe(IObserver<double> observer)
    {
        _observers.Add(observer);
        return new Unsubscriber(_observers, observer);
    }

    private class Unsubscriber : IDisposable
    {
        // 省略实现细节
    }

    public void RecordData(double temp)
    {
        // 模拟传感器故障
        if (temp > 100)
        {
            _observers.ForEach(o => o.OnError(
                new InvalidOperationException("传感器过热!")));
            return;
        }

        _observers.ForEach(o => o.OnNext(temp));
    }

    public void Shutdown()
    {
        _observers.ForEach(o => o.OnCompleted());
        _observers.Clear();
    }
}

// 观察者实现
public class AlertSystem : IObserver<double>
{
    public void OnNext(double temp)
    {
        Console.WriteLine($"当前温度:{temp}℃");
        if (temp > 35) Console.WriteLine("高温警告!");
    }

    public void OnError(Exception ex)
    {
        Console.WriteLine($"系统故障:{ex.Message}");
    }

    public void OnCompleted()
    {
        Console.WriteLine("监控系统已关闭");
    }
}

三、ReactiveX高级应用

image

1. 安装NuGet包

Install-Package System.Reactive

2. 核心操作符演示

// 创建可观察序列
var mouseMoves = Observable.FromEventPattern<MouseEventArgs>(
    form, "MouseMove");

// 操作符链式处理
var throttledClicks = Observable.FromEventPattern(button, "Click")
    .Throttle(TimeSpan.FromMilliseconds(500))  // 防抖
    .Select(_ => DateTime.Now)                // 转换数据
    .Buffer(3)                                 // 聚合事件
    .Where(clicks => clicks.Count >= 3)       // 过滤条件
    .Subscribe(clicks => {
        Console.WriteLine($"3秒内快速点击{clicks.Count}次!");
    });

3. 实时数据管道

// 模拟股票数据流
var stockTicker = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(_ => new {
        Symbol = "MSFT",
        Price = Random.Shared.Next(250, 270)
    });

// 构建处理管道
stockTicker
    .Where(quote => quote.Price > 260)
    .Buffer(TimeSpan.FromSeconds(5))
    .Subscribe(quotes => {
        var avg = quotes.Average(q => q.Price);
        Console.WriteLine($"5秒均价:{avg:F2}");
    });

四、参考

响应式编程不是银弹,但在处理事件流、实时数据等场景下,它能将复杂异步逻辑转化为清晰的声明式管道

  • ReactiveX.io
  • [IObservable Interface](https://learn.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=net-6.0)