我有一个gRPC客户端,我想有一个方法来简化它的使用。该方法应该返回IAsyncEnumerable
个从gRPC服务器流传输的项目。我有一个指定的流传输超时,不能超过。如果发生超时,我想离开所有的项目,我设法获取到目前为止。
这是我尝试做的:
public async IAsyncEnumerable<Item> Search(
SearchParameters parameters,
CancellationToken cancellationToken,
IDictionary<string, string> headers = null)
{
try
{
await _client.Search(
MapInput(parameters),
cancellationToken: cancellationToken,
deadline: DateTime.UtcNow.Add(_configuration.Timeout),
headers: MapHeaders(headers))
.ResponseStream.ForEachAsync(item =>
{
yield return MapSingleItem(item); // compilation error
});
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
{
_logger.LogWarning("Steam finished due to timeout, a limited number of items has been returned");
}
}
从逻辑上讲,这应该是可行的。但是,yield
关键字在lambda中不受支持,所以它不能编译。有没有其他方法可以编写它?
2条答案
按热度按时间vngu2lb81#
您需要一个中间缓冲区来保存这些项,因为
IAsyncEnumerable<Item>
的使用者可以按照自己的节奏枚举它。Channel<T>
类是一个很好的用于此目的的异步缓冲区。您可能需要考虑的另一件事是,如果使用者过早地放弃
IAsyncEnumerable<Item>
的枚举(可能是由于break
ing或return
ing而故意放弃,也可能是由于遇到异常而不情愿放弃),会发生什么情况。最好的方法是取消iterator的finally块中的linkedCancellationTokenSource
。综合考虑:
如果您希望您的令牌具有停止语义,则应该首先将其重命名为
stoppingToken
,然后在producer
任务中相应地处理OperationCanceledException
异常。kb5ga3dv2#
在www.example.com中Rx.net您可以使用.Debounce运算子和.TakeUntil运算子来执行此作业。
编辑:假设_客户端。搜索返回
IEnumerable
。