我已经尝试过了,但它似乎有并发问题。
我不完全明白什么是错的。
public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
var buffer = new byte[bufferSize];
return Observable
.FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
.Repeat()
.TakeWhile(x => x.bytesRead != 0)
.Select(x => x.buffer)
.SelectMany(x => x);
}
字符串
1条答案
按热度按时间hpcdzsge1#
可能的并发问题是,当
.SelectMany(x => x)
在缓冲区上执行时,对stream.ReadAsync
的调用正在缓冲区上执行。您需要确保在
FromAsync
中返回缓冲区的副本。这个版本涵盖了这些问题:
字符串
我用这个测试了你的原始代码和我的版本:
型
你的每次都失败了,我的却成功了。
对于你的函数,你可能会考虑的一件事是,observable不能并发运行,它不能重复,因为它不管理流的生命周期,你有一个共享缓冲区。
您可以通过将签名更改为以下内容来解决此问题:
型
现在代码更加健壮了。
型