rx.Observable.replay()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(194)

本文整理了Java中rx.Observable.replay()方法的一些代码示例,展示了Observable.replay()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.replay()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:replay

Observable.replay介绍

[英]Returns a ConnectableObservable that shares a single subscription to the underlying Observable that will replay all of its items and notifications to any future Observer. A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: This version of replay does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它共享对基础Observable的单个订阅,该订阅将向任何未来的观察者重播其所有项目和通知。可连接的可观察对象类似于普通的可观察对象,只是它在订阅时不会开始发射项目,而只有在调用其连接方法时才会发射项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。调度程序:默认情况下,此版本的replay不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxNetty

  1. static <X> DisposableContentSource<X> createNew(Observable<X> source) {
  2. final ArrayList<X> chunks = new ArrayList<>();
  3. ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
  4. @Override
  5. public void call(X x) {
  6. chunks.add(x);
  7. }
  8. }).replay();
  9. return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
  10. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. /**
  2. * This method will only be called once when the retained Fragment is first created.
  3. */
  4. @Override
  5. public void onCreate(Bundle savedInstanceState) {
  6. super.onCreate(savedInstanceState);
  7. // Retain this fragment across configuration changes.
  8. setRetainInstance(true);
  9. if (_storedIntsObservable != null) {
  10. return;
  11. }
  12. Observable<Integer> intsObservable =//
  13. Observable.interval(1, TimeUnit.SECONDS)//
  14. .map(new Func1<Long, Integer>() {
  15. @Override
  16. public Integer call(Long aLong) {
  17. return aLong.intValue();
  18. }
  19. })//
  20. .take(20);
  21. // -----------------------------------------------------------------------------------
  22. // Making our observable "HOT" for the purpose of the demo.
  23. //_intsObservable = _intsObservable.share();
  24. _storedIntsObservable = intsObservable.replay();
  25. _storedIntsSubscription = _storedIntsObservable.connect();
  26. // Do not do this in production!
  27. // `.share` is "warm" not "hot"
  28. // the below forceful subscription fakes the heat
  29. //_intsObservable.subscribe();
  30. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. return replay(bufferSize, time, unit, Schedulers.computation());

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Returns a {@link ConnectableObservable} that shares a single subscription to the source Observable and
  3. * replays all items emitted by that Observable within a specified time window. A Connectable Observable
  4. * resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to,
  5. * but only when its {@code connect} method is called.
  6. * <p>
  7. * <img width="640" height="515" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.t.png" alt="">
  8. * <dl>
  9. * <dt><b>Backpressure Support:</b></dt>
  10. * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
  11. * multiple subscribers. Each child will need to manage backpressure independently using operators such
  12. * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
  13. * <dt><b>Scheduler:</b></dt>
  14. * <dd>This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.</dd>
  15. * </dl>
  16. *
  17. * @param time
  18. * the duration of the window in which the replayed items must have been emitted
  19. * @param unit
  20. * the time unit of {@code time}
  21. * @return a {@link ConnectableObservable} that shares a single subscription to the source Observable and
  22. * replays the items that were emitted during the window defined by {@code time}
  23. * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
  24. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
  25. */
  26. public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
  27. return replay(time, unit, Schedulers.computation());
  28. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. return replay(selector, bufferSize, time, unit, Schedulers.computation());

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. return replay(selector, time, unit, Schedulers.computation());

代码示例来源:origin: techery/janet

  1. private <A> Observable<A> createPipeline(Observable<A> source) {
  2. return source.mergeWith(clearingStream).replay(1).autoConnect();
  3. }

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

  1. @Override
  2. public Observable<Void> connect(final Observable<InstanceInfo> registrant) {
  3. final AtomicReference<InstanceInfo> instanceInfoRef = new AtomicReference<>();
  4. return registrant.replay(1).refCount()
  5. .flatMap(new Func1<InstanceInfo, Observable<Void>>() {
  6. @Override
  7. public Observable<Void> call(InstanceInfo instanceInfo) {
  8. logger.info("registering self InstanceInfo {}", instanceInfo);
  9. instanceInfoRef.set(instanceInfo);
  10. return registry.register(instanceInfo, selfSource).ignoreElements().cast(Void.class);
  11. }
  12. })
  13. .doOnUnsubscribe(new Action0() {
  14. @Override
  15. public void call() {
  16. InstanceInfo info = instanceInfoRef.getAndSet(null);
  17. if (info != null) {
  18. logger.info("unregistering self InstanceInfo {}", info);
  19. registry.unregister(info, selfSource).subscribe();
  20. }
  21. }
  22. });
  23. }

代码示例来源:origin: yahoo/fili

  1. .filter(Either::isRight)
  2. .map(Either::getRight)
  3. .replay(1);
  4. jobMetadata
  5. .replay(1);
  6. jobRowStoredNotification.connect();
  7. jobMetadata
  8. .replay(1);
  9. preResponseStoredNotification.connect();
  10. jobMetadata
  11. .replay(1);
  12. jobRowUpdatedNotification.connect();

代码示例来源:origin: yahoo/fili

  1. .filter(ticket::equals)
  2. .take(1)
  3. .replay(1);
  4. broadcastChannelNotifications.connect();

代码示例来源:origin: com.netflix.eureka/eureka2-core

  1. final Observable<OP> opObservable = opStream.replay(1).refCount();
  2. final Subscriber<OP> opSubscriber = new NoOpSubscriber<>();
  3. final Observable<CHANNEL> channelObservable = channelObservableWithCleanUp(channelSubject);

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test1x() throws Exception {
  3. rx.schedulers.TestScheduler scheduler = new rx.schedulers.TestScheduler();
  4. rx.Observable<Integer> source = rx.Observable.just(1)
  5. .replay(2, TimeUnit.SECONDS, scheduler)
  6. .autoConnect();
  7. source.test().assertResult(1);
  8. source.test().assertResult(1);
  9. scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
  10. source.test().assertResult();
  11. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() {
  3. Observable<Integer> obs1 = Observable.create(emitter -> {
  4. emitter.onNext(1);
  5. emitter.onCompleted();
  6. }, Emitter.BackpressureMode.BUFFER);
  7. Observable<Integer> obs2 = Observable.create(emitter -> {
  8. emitter.onNext(2);
  9. emitter.onCompleted();
  10. }, Emitter.BackpressureMode.BUFFER);
  11. Observable<Integer> both = Observable
  12. .concatDelayError(obs1, obs2)
  13. .replay().autoConnect();
  14. both.test().assertResult(1, 2);
  15. both.test().assertResult(1, 2);
  16. }
  17. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void testFlakyReplayBehavior2() {
  3. final PublishSubject<Integer> subject = PublishSubject.create();
  4. final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
  5. final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
  6. sums.connect();
  7. subject.onNext(2);
  8. subject.onNext(3);
  9. sums.subscribe(subscriber);
  10. // subscriber.assertValueCount(1);
  11. subscriber.assertValues(6);
  12. }
  13. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void testFlakyReplayBehavior() {
  3. final TestScheduler scheduler = new TestScheduler();
  4. final TestSubject<Integer> subject = TestSubject.create(scheduler);
  5. final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
  6. final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
  7. sums.connect();
  8. subject.onNext(2);
  9. subject.onNext(3);
  10. scheduler.triggerActions();
  11. sums.subscribe(subscriber);
  12. // subscriber.assertValueCount(1);
  13. subscriber.assertValues(6);
  14. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void testExpectedReplayBehavior() {
  3. final TestScheduler scheduler = new TestScheduler();
  4. final TestSubject<Integer> subject = TestSubject.create(scheduler);
  5. final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
  6. final ConnectableObservable<Integer> sums = subject.scan((a, b) -> a + b).replay(1);
  7. sums.connect();
  8. subject.onNext(1);
  9. subject.onNext(2);
  10. subject.onNext(3);
  11. scheduler.triggerActions();
  12. sums.subscribe(subscriber);
  13. subscriber.assertValueCount(1);
  14. subscriber.assertValues(6);
  15. }

相关文章

Observable类方法