.net 将Stream转换为Observable

epfja78i  于 2023-11-20  发布在  .NET
关注(0)|答案(1)|浏览(146)

我已经尝试过了,但它似乎有并发问题。
我不完全明白什么是错的。

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];

    return Observable
        .FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
        .Repeat()
        .TakeWhile(x => x.bytesRead != 0)
        .Select(x => x.buffer)
        .SelectMany(x => x);
}

字符串

hpcdzsge

hpcdzsge1#

可能的并发问题是,当.SelectMany(x => x)在缓冲区上执行时,对stream.ReadAsync的调用正在缓冲区上执行。
您需要确保在FromAsync中返回缓冲区的副本。
这个版本涵盖了这些问题:

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];
    return
        Observable
            .FromAsync(async ct =>
            {
                var bytes = await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
                return buffer.Take(bytes).ToArray();
            })
            .Repeat()
            .TakeWhile(x => x.Length != 0)
            .SelectMany(x => x);
}

字符串
我用这个测试了你的原始代码和我的版本:

var bytes1 =
    await
        Observable
            .Using(
                () => File.OpenRead(fileName),
                s => s.ToObservable())
            .ToArray();

var bytes2 = await File.ReadAllBytesAsync(fileName);

Console.WriteLine(bytes1.SequenceEqual(bytes2));


你的每次都失败了,我的却成功了。
对于你的函数,你可能会考虑的一件事是,observable不能并发运行,它不能重复,因为它不管理流的生命周期,你有一个共享缓冲区。
您可以通过将签名更改为以下内容来解决此问题:

IObservable<byte> ToObservable<S>(this Func<S> streamFactory, int bufferSize = 4096) where S : Stream


现在代码更加健壮了。

public static IObservable<byte> ToObservable<S>(
        this Func<S> streamFactory,
        int bufferSize = 4096)
            where S : Stream =>
    Observable
        .Using(
            streamFactory,
            stream =>
            {
                var buffer = new byte[bufferSize];
                return
                    Observable
                        .FromAsync(async ct =>
                        {
                            var bytes =
                                await
                                    stream
                                        .ReadAsync(buffer, 0, buffer.Length, ct)
                                        .ConfigureAwait(false);
                            return buffer.Take(bytes).ToArray();
                        })
                        .Repeat()
                        .TakeWhile(x => x.Length != 0)
                        .SelectMany(x => x);
            });

相关问题