.net 使用超时try-catch从grpc返回IAsyncEnumerable

wmtdaxz3  于 2022-12-01  发布在  .NET
关注(0)|答案(2)|浏览(158)

我有一个gRPC客户端,我想有一个方法来简化它的使用。该方法应该返回IAsyncEnumerable个从gRPC服务器流传输的项目。我有一个指定的流传输超时,不能超过。如果发生超时,我想离开所有的项目,我设法获取到目前为止。
这是我尝试做的:

public async IAsyncEnumerable<Item> Search(
        SearchParameters parameters, 
        CancellationToken cancellationToken, 
        IDictionary<string, string> headers = null)
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: cancellationToken,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    yield return MapSingleItem(item); // compilation error
                });
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
        {
            _logger.LogWarning("Steam finished due to timeout, a limited number of items has been returned");
        }
    }

从逻辑上讲,这应该是可行的。但是,yield关键字在lambda中不受支持,所以它不能编译。有没有其他方法可以编写它?

vngu2lb8

vngu2lb81#

您需要一个中间缓冲区来保存这些项,因为IAsyncEnumerable<Item>的使用者可以按照自己的节奏枚举它。Channel<T>类是一个很好的用于此目的的异步缓冲区。
您可能需要考虑的另一件事是,如果使用者过早地放弃IAsyncEnumerable<Item>的枚举(可能是由于break ing或return ing而故意放弃,也可能是由于遇到异常而不情愿放弃),会发生什么情况。最好的方法是取消iteratorfinally块中的linkedCancellationTokenSource
综合考虑:

public async IAsyncEnumerable<Item> Search(
    SearchParameters parameters, 
    [EnumeratorCancellation] CancellationToken cancellationToken = default,
    IDictionary<string, string> headers = null)
{
    Channel<Item> channel = Channel.CreateUnbounded<Item>();
    using var linkedCTS = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task producer = Task.Run(async () =>
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: linkedCTS.Token,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    channel.Writer.TryWrite(item);
                }).ConfigureAwait(false);
            channel.Writer.Complete();
        }
        catch (Exception ex) { channel.Writer.Complete(ex); }
    });

    try
    {
        await foreach (var item in channel.Reader.ReadAllAsync()
            .ConfigureAwait(false))
        {
            yield return item;
        }
    }
    finally
    {
        linkedCTS.Cancel();
        await producer.ConfigureAwait(false);
    }
}

如果您希望您的令牌具有停止语义,则应该首先将其重命名为stoppingToken,然后在producer任务中相应地处理OperationCanceledException异常。

kb5ga3dv

kb5ga3dv2#

在www.example.com中Rx.net您可以使用.Debounce运算子和.TakeUntil运算子来执行此作业。

var input = Observable.Create<Item>((observer, cancellationToken) => 
    Task.Factory.StartNew(() =>
    {
        try
        {
           var items = _client.Search(
                MapInput(parameters),
                cancellationToken: cancellationToken,
                deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                headers: MapHeaders(headers));
           foreach(var item in items)
                observer.OnNext(item);
        }
        catch(Exception ex)
        {
            observer.OnError(ex);
        }
     }, TaskCreationOptions.LongRunning)
);

var inputObservable = input
      .Publish()
      .RefCount();

var timeout = inputObs
     .Throttle(TimeSpan.FromSeconds(10));
var outputObs = inputObservable
    .TakeUntil(timeout);
  

return outputObs
     .ToAsyncEnumerable()
     .ToListAsync();

编辑:假设_客户端。搜索返回IEnumerable

相关问题