.net React式框架(RX)和异步处理事件

nkkqxpd9  于 2022-12-24  发布在  .NET
关注(0)|答案(2)|浏览(135)

所以我只是在玩RX和学习它。我开始玩事件,想知道如何订阅事件,并异步地批量处理结果。请允许我用代码解释:
引发事件的简单类:

public class EventRaisingClass
{
   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...
}

public class SomeEventArgs : EventArgs
{
    public SomeEventArgs(int data)
    {
        this.SomeArg = data;
    }

    public int SomeArg { get; private set; }
}

然后我的主要:

public static void Main(string[] args)
{
    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...

}

正如您在我的评论中所看到的,当您像这样调用subscribe时,所有订阅代码都同步发生。有没有一种方法可以使用RX,在有新的一批事件要处理时,在不同的线程上调用Subscribe?

mcdcgff0

mcdcgff01#

bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn用于指定所谓的“订阅副作用”发生的时间表。例如,您的可观察对象可以在每次有人订阅时打开一个文件。
ObserveOn是指定每次有新值时对观察器的调用将发生的调度,实际上它比SubscribeOn使用得更频繁。

4nkexdtk

4nkexdtk2#

我相信您正在寻找SubscribeOnObserveOn,传递一个ISchedulerSystem.Concurrency下内置了几个调度器;其中一些线程使用当前线程,而其他线程使用特定线程。
This video提供了有关调度程序概念的更多信息。
Rx团队最近还发布了一个hands-on labs文档,这是目前最接近教程的东西。

相关问题