io.reactivex.Observable.cacheWithInitialCapacity()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(3.0k)|赞(0)|评价(0)|浏览(105)

本文整理了Java中io.reactivex.Observable.cacheWithInitialCapacity()方法的一些代码示例,展示了Observable.cacheWithInitialCapacity()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cacheWithInitialCapacity()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:cacheWithInitialCapacity

Observable.cacheWithInitialCapacity介绍

[英]Returns an Observable that subscribes to this ObservableSource lazily, caches all of its events and replays them, in the same order as received, to all the downstream subscribers.

This is useful when you want an ObservableSource to cache responses and you can't control the subscribe/dispose behavior of all the Observers.

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this ObservableSource. In contrast, the operator family of #replay()that return a ConnectableObservable require an explicit call to ConnectableObservable#connect().

Note: You sacrifice the ability to dispose the origin when you use the cacheObserver so be careful not to use this Observer on ObservableSources that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

Since the operator doesn't allow clearing the cached values either, the possible workaround is to forget all references to it via #onTerminateDetach() applied along with the previous workaround:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

Scheduler: cacheWithInitialCapacity does not operate by default on a particular Scheduler.

Note: The capacity hint is not an upper bound on cache size. For that, consider #replay(int) in combination with ConnectableObservable#autoConnect() or similar.
[中]返回延迟订阅此ObservableSource的Observable,缓存其所有事件,并按照接收到的相同顺序将其重播给所有下游订阅方。
当您希望ObservateSource缓存响应,并且无法控制所有观察者的订阅/处置行为时,这非常有用。
只有当第一个下游订户订阅并维护对该可观察资源的单一订阅时,运营商才会订阅。相反,返回ConnectableObservable的#replay()运算符族需要显式调用ConnectableObservable#connect()。
*注意:*当您使用cacheObserver时,您牺牲了处理原点的能力,因此请小心不要在发出无限或非常大量项的Observesource上使用此Observator,这将耗尽内存。一种可能的解决方法是在应用cache()之前(可能之后)使用谓词或其他源应用“takeUntil”。

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

由于操作员也不允许清除缓存的值,因此可能的解决方法是通过与前面的解决方法一起应用的#onTerminateDetach()忘记对它的所有引用:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

调度程序:cacheWithInitialCapacity默认情况下不会在特定调度程序上运行。
*注意:*容量提示不是缓存大小的上限。为此,考虑与关联的可连接可观察的γ自动重写(或)类似的重放(int)。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> cache() {
  return cacheWithInitialCapacity(16);

代码示例来源:origin: ReactiveX/RxJava

}).cacheWithInitialCapacity(1);

相关文章

Observable类方法