.net 如何等待多个IAsyncEnumerable

qhhrdooz  于 2023-07-01  发布在  .NET
关注(0)|答案(2)|浏览(106)

我们有这样的代码:

var intList = new List<int>{1,2,3};
var asyncEnumerables = intList.Select(Foo);

private async IAsyncEnumerable<int> Foo(int a)
{
  while (true)
  {
    await Task.Delay(5000);
    yield return a;
  } 
}

我需要为每个asyncEnumerable条目启动await foreach。每个循环迭代都应该互相等待,当每个迭代完成时,我需要收集每个迭代的数据并通过另一种方法处理。
我能通过TPL实现这一点吗?不然的话,你就不能给我出点主意吗?

mnemlml8

mnemlml81#

对我有用的是this repo中的Zip函数(81行)
我是这样用的

var intList = new List<int> { 1, 2, 3 };
var asyncEnumerables = intList.Select(RunAsyncIterations);
var enumerableToIterate = async_enumerable_dotnet.AsyncEnumerable.Zip(s => s, asyncEnumerables.ToArray());

await foreach (int[] enumerablesConcatenation in enumerableToIterate)
{
    Console.WriteLine(enumerablesConcatenation.Sum()); //Sum returns 6
    await Task.Delay(2000);
}

static async IAsyncEnumerable<int> RunAsyncIterations(int i)
{
    while (true)
        yield return i;
}
watbbzwu

watbbzwu2#

下面是一个可以使用的泛型方法Zip,它被实现为iteratorcancellationToken使用EnumeratorCancellation属性进行修饰,因此生成的IAsyncEnumerableWithCancellation友好。

using System.Runtime.CompilerServices;

public static async IAsyncEnumerable<TSource[]> Zip<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    var enumerators = sources
        .Select(x => x.GetAsyncEnumerator(cancellationToken))
        .ToArray();
    try
    {
        while (true)
        {
            var array = new TSource[enumerators.Length];
            for (int i = 0; i < enumerators.Length; i++)
            {
                if (!await enumerators[i].MoveNextAsync().ConfigureAwait(false))
                    yield break;
                array[i] = enumerators[i].Current;
            }
            yield return array;
        }
    }
    finally
    {
        foreach (var enumerator in enumerators)
        {
            await enumerator.DisposeAsync().ConfigureAwait(false);
        }
    }
}

使用示例:

await foreach (int[] result in Zip(asyncEnumerables))
{
    Console.WriteLine($"Result: {String.Join(", ", result)}");
}

此实现不是并发的。MoveNextAsync操作随后启动,一个在另一个完成之后。实现一个并发的实现(ZipConcurrent)是可能的,而且应该不是特别困难,但是应该谨慎。
这种实现也不是100%可靠的。它假设没有GetAsyncEnumerator调用会失败,也没有DisposeAsync调用会同步或异步失败,否则可能会保留一些枚举器。这些都是合理的假设,但是一个完美的健壮实现不应该依赖于任何这些假设。

相关问题