在AKKA.NET执行元中处理任务中运行的服务的事件

atmip9wb  于 2022-11-05  发布在  .NET
关注(0)|答案(3)|浏览(153)

我正在为我的WPF应用程序使用Prism框架。我有一个生产者服务,它在任务中运行,并在找到文件时引发CompositePresentationEvent。我有一个Akka参与者,它订阅了该事件。参与者的处理程序看起来很简单,如下所示:

private void HandleFileReceive(FileEventArgs args)
{
    Self.Tell(new FileReceived(args.File));
}

当事件命中上面的处理程序时,我收到一个System.NotSupportedException,其中包含以下消息:没有活动的ActorContext,这很可能是由于在此参与者中使用了异步操作。
我认为这是因为服务运行在与参与者的处理程序不同的线程中。在Akka .NET中有没有方法处理这种类型的事情?
我并不完全反对编写新的Actors来完成我的情况所需要的服务的工作。问题是,根据文件中的一些设置,服务将是不同的。目前,我使用MEF处理这个问题,并从IoC容器中获取给定接口的正确实现器。我希望继续保持生产者的具体实现从核心代码(参与者所在的地方)中抽象出来。
对于解决这个(感知到的)线程问题和/或动态生成实现给定接口的ProducerActor,有什么建议吗?
谢谢

ryoqjall

ryoqjall1#

我也遇到过类似的问题,我想运行一个进程,从一个演员那里运行一个命令行应用程序。问题是我想得到输出,这是通过处理process.OutputDataReceived来完成的。
最后,我使用一个自定义堆栈将来自处理程序process.OutputDataReceived += (sender, e) => Output.Push(e.Data);的消息
而自定义堆栈看起来像

class OutputStack<T> : Stack<T>
{
   public event EventHandler OnAdd;
   public void Push(T item)
   {
       base.Push(item);
       if (null != OnAdd) OnAdd(this, null);
   }
}

然后在构造函数中处理自定义堆栈的OnAdd

OutputStack<string> Output = new OutputStack<string>();
Output.OnAdd += (sender, e) =>
{
    if (Output.Count > 0)
    {
        var message = Output.Pop();
        actor.Tell(new LogActor.LogMessage(message));
    }
};

这是一个有点hacky,但它的工作,我得到发送的信息,只要它发生(并得到处理)。希望我会得到修复它在未来...

t8e9dugd

t8e9dugd2#

最后我创建了一个actor来处理响应检索,并更改了生成器接口,这样具体的生成器只需要实现一个简单的方法。旧的生成器负责在检查响应之前等待一段配置驱动的时间,但新代码利用AKKA的调度器在相同的配置驱动间隔内告诉检索器,我在下面提供了一个示例,但是我的代码在错误处理等方面稍微复杂一些。
在下面的代码中,您可以看到IPerProducer的接口和具体实现:LocalProducer和EmailProducer。在真实的代码中,这些属性告诉容器一些其他信息,这些信息用于从容器获取正确的实现。ConsumerActor是此方案的父参与者,它处理Consume消息。它在其构造函数中创建ResponseRetrieverActor,并为ResponseRetrieverActor安排重复的告知,以便检索器以给定的时间间隔检查响应。
在ResponseRetrieverActor的RetrieveResponses处理程序中,IoC的魔力发生了。(这里没有显示--它实际上是在一个配置文件中设置的)传输类型用于使用前面提到的具体Producer上的属性从IoC容器中检索正确的Producer。最后,Producer用于获取响应并告诉父级,即使用者,了解列表中的每个文件。

public interface IProducer
{
   List<string> GetResponses();
}

[Export(typeof(IProducer))] // Other attributes needed for MEF Export to differentiate the multiple IProducer implementations
public LocalProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from a directory
   }
}

[Export(typeof(IProducer))]
public EmailProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from email account
   }
}

public class ConsumerActor
{
    public class Consume
    {
       public Consume(string file) { this.File = file; }

       public string File { get; set; }
    }

    public ConsumerActor() 
    {
       _retriever = Context.ActorOf(Props.Create<ResponseRetrieverActor>(), "retriever");

       var interval = 10000;
       Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, interval, _retriever, new ResponseRetrieverActor.RetrieveResponses(), Self);
       Start(); 
    }

    private void Start()
    {
        Receive<Consume>(msg => 
        {
           // do something with the msg.File 
        });
    }

    private IActorRef _retriever;
}
public class ResponseRetrieverActor
{
  public class RetrieveResponses { }

  public ConsumerActor() { Start(); }

  private void Start()
  {
    Receive<RetrieveResponses>(msg => HandleRetrieveResponses());  
  }

  private void HandleRetrieveResponses()
  {
     var transportType = TransportFactory.GetTransportType(); // Gets transport protocol for the producer we need to use (Email, File, ect.)
     var producer = ServiceLocator.Current.GetInstance<IProducer>(transportType); // Gets a producer from the IoC container for the specified transportType

     var responses = producer.GetResponses();

     foreach(var response in responseFiles)
     {
         Context.Parent.Tell(new ConsumerActor.Consume(response));
     }
  }
}
wn9m85ua

wn9m85ua3#

不确定这是否解决了OP问题,但当我搜索“没有活动的ActorContext,这很可能是由于使用了异步”时,我找到了这里,所以我将把我的问题的简单解决方案放在这里:
如果您等待某个处理程序中的某个异步调用,则该处理程序必须是异步的。如果异步处理程序不是与ReceiveAsync〈〉一起使用,而是与Receive〈〉一起使用,则会发生上述异常。ReceiveAsync〈〉会在异步处理程序未完成时挂起参与者的邮箱。

相关问题