io.reactivex.Flowable.cacheWithInitialCapacity()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(3.2k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中io.reactivex.Flowable.cacheWithInitialCapacity()方法的一些代码示例,展示了Flowable.cacheWithInitialCapacity()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.cacheWithInitialCapacity()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:cacheWithInitialCapacity

Flowable.cacheWithInitialCapacity介绍

[英]Returns a Flowable that subscribes to this Publisher lazily, caches all of its events and replays them, in the same order as received, to all the downstream subscribers.

This is useful when you want a Publisher to cache responses and you can't control the subscribe/cancel behavior of all the Subscribers.

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this Publisher. In contrast, the operator family of #replay()that return a ConnectableFlowable require an explicit call to ConnectableFlowable#connect().

Note: You sacrifice the ability to cancel the origin when you use the cacheSubscriber so be careful not to use this Subscriber on Publishers that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

Since the operator doesn't allow clearing the cached values either, the possible workaround is to forget all references to it via #onTerminateDetach() applied along with the previous workaround:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

Backpressure: The operator consumes this Publisher in an unbounded fashion but respects the backpressure of each downstream Subscriber individually. Scheduler: cacheWithInitialCapacity does not operate by default on a particular Scheduler.

Note: The capacity hint is not an upper bound on cache size. For that, consider #replay(int) in combination with ConnectableFlowable#autoConnect() or similar.
[中]返回一个FlowTable,该FlowTable延迟订阅此发布服务器,缓存其所有事件,并按照接收到的相同顺序将其重播给所有下游订阅服务器。
当您希望发布服务器缓存响应,并且无法控制所有订阅服务器的订阅/取消行为时,这非常有用。
只有当第一个下游订阅者订阅并维护对该发布者的单一订阅时,运营商才会订阅。相反,返回ConnectableFlowable的#replay()运算符族需要显式调用ConnectableFlowable#connect()。
*注意:*当您使用cacheSubscriber时,您会牺牲取消源站的功能,因此请小心不要在发布服务器上使用此订阅服务器,因为这些发布服务器会发出无限或非常多的项目,从而耗尽内存。一种可能的解决方法是在应用cache()之前(或者之后)使用谓词或其他源应用“takeUntil”。

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.subscribe(...);

由于操作员也不允许清除缓存的值,可能的解决方法是通过与前面的解决方法一起应用的#onTerminateDetach()忘记对它的所有引用:

AtomicBoolean shouldStop = new AtomicBoolean(); 
source.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.cache() 
.takeUntil(v -> shouldStop.get()) 
.onTerminateDetach() 
.subscribe(...);

背压:操作员以无限制的方式使用此发布服务器,但单独尊重每个下游订阅服务器的背压。Scheduler:cacheWithInitialCapacity默认情况下不会在特定计划程序上运行。
*注意:*容量提示不是缓存大小的上限。为此,考虑与关联的连接表(int)和CONTABLE LeavySyAutoCuthTo()或类似的组合。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> cache() {
  return cacheWithInitialCapacity(16);

代码示例来源:origin: redisson/redisson

@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> cache() {
  return cacheWithInitialCapacity(16);

代码示例来源:origin: ReactiveX/RxJava

}).cacheWithInitialCapacity(1);

相关文章

Flowable类方法