rx.Observable.replay()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(163)

本文整理了Java中rx.Observable.replay()方法的一些代码示例,展示了Observable.replay()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.replay()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:replay

Observable.replay介绍

[英]Returns a ConnectableObservable that shares a single subscription to the underlying Observable that will replay all of its items and notifications to any future Observer. A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: This version of replay does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它共享对基础Observable的单个订阅,该订阅将向任何未来的观察者重播其所有项目和通知。可连接的可观察对象类似于普通的可观察对象,只是它在订阅时不会开始发射项目,而只有在调用其连接方法时才会发射项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。调度程序:默认情况下,此版本的replay不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxNetty

static <X> DisposableContentSource<X> createNew(Observable<X> source) {
  final ArrayList<X> chunks = new ArrayList<>();
  ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
    @Override
    public void call(X x) {
      chunks.add(x);
    }
  }).replay();
  return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

/**
 * This method will only be called once when the retained Fragment is first created.
 */
@Override
public void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  // Retain this fragment across configuration changes.
  setRetainInstance(true);
  if (_storedIntsObservable != null) {
    return;
  }
  Observable<Integer> intsObservable =//
     Observable.interval(1, TimeUnit.SECONDS)//
        .map(new Func1<Long, Integer>() {
          @Override
          public Integer call(Long aLong) {
            return aLong.intValue();
          }
        })//
        .take(20);
  // -----------------------------------------------------------------------------------
  // Making our observable "HOT" for the purpose of the demo.
  //_intsObservable = _intsObservable.share();
  _storedIntsObservable = intsObservable.replay();
  _storedIntsSubscription = _storedIntsObservable.connect();
  // Do not do this in production!
  // `.share` is "warm" not "hot"
  // the below forceful subscription fakes the heat
  //_intsObservable.subscribe();
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

return replay(bufferSize, time, unit, Schedulers.computation());

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Returns a {@link ConnectableObservable} that shares a single subscription to the source Observable and
 * replays all items emitted by that Observable within a specified time window. A Connectable Observable
 * resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to,
 * but only when its {@code connect} method is called. 
 * <p>
 * <img width="640" height="515" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.t.png" alt="">
 * <dl>
 *  <dt><b>Backpressure Support:</b></dt>
 *  <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
 *      multiple subscribers. Each child will need to manage backpressure independently using operators such
 *      as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param time
 *            the duration of the window in which the replayed items must have been emitted
 * @param unit
 *            the time unit of {@code time}
 * @return a {@link ConnectableObservable} that shares a single subscription to the source Observable and
 *         replays the items that were emitted during the window defined by {@code time}
 * @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
 */
public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
  return replay(time, unit, Schedulers.computation());
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

return replay(selector, bufferSize, time, unit, Schedulers.computation());

代码示例来源:origin: com.netflix.rxjava/rxjava-core

return replay(selector, time, unit, Schedulers.computation());

代码示例来源:origin: techery/janet

private <A> Observable<A> createPipeline(Observable<A> source) {
  return source.mergeWith(clearingStream).replay(1).autoConnect();
}

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

@Override
public Observable<Void> connect(final Observable<InstanceInfo> registrant) {
  final AtomicReference<InstanceInfo> instanceInfoRef = new AtomicReference<>();
  return registrant.replay(1).refCount()
      .flatMap(new Func1<InstanceInfo, Observable<Void>>() {
        @Override
        public Observable<Void> call(InstanceInfo instanceInfo) {
          logger.info("registering self InstanceInfo {}", instanceInfo);
          instanceInfoRef.set(instanceInfo);
          return registry.register(instanceInfo, selfSource).ignoreElements().cast(Void.class);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          InstanceInfo info = instanceInfoRef.getAndSet(null);
          if (info != null) {
            logger.info("unregistering self InstanceInfo {}", info);
            registry.unregister(info, selfSource).subscribe();
          }
        }
      });
}

代码示例来源:origin: yahoo/fili

.filter(Either::isRight)
    .map(Either::getRight)
    .replay(1);
    jobMetadata
    .replay(1);
jobRowStoredNotification.connect();
    jobMetadata
    .replay(1);
preResponseStoredNotification.connect();
    jobMetadata
    .replay(1);
jobRowUpdatedNotification.connect();

代码示例来源:origin: yahoo/fili

.filter(ticket::equals)
    .take(1)
    .replay(1);
broadcastChannelNotifications.connect();

代码示例来源:origin: com.netflix.eureka/eureka2-core

final Observable<OP> opObservable = opStream.replay(1).refCount();
final Subscriber<OP> opSubscriber = new NoOpSubscriber<>();
final Observable<CHANNEL> channelObservable = channelObservableWithCleanUp(channelSubject);

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test1x() throws Exception {
  rx.schedulers.TestScheduler scheduler = new rx.schedulers.TestScheduler();
  rx.Observable<Integer> source = rx.Observable.just(1)
      .replay(2, TimeUnit.SECONDS, scheduler)
      .autoConnect();
  source.test().assertResult(1);
  source.test().assertResult(1);
  scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
  source.test().assertResult();
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() {
    Observable<Integer> obs1 = Observable.create(emitter -> {
      emitter.onNext(1);
      emitter.onCompleted();
    }, Emitter.BackpressureMode.BUFFER);

    Observable<Integer> obs2 = Observable.create(emitter -> {
      emitter.onNext(2);
      emitter.onCompleted();
    }, Emitter.BackpressureMode.BUFFER);
    
    Observable<Integer> both = Observable
        .concatDelayError(obs1, obs2)
        .replay().autoConnect();

    both.test().assertResult(1, 2);

    both.test().assertResult(1, 2);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void testFlakyReplayBehavior2() {
    final PublishSubject<Integer> subject = PublishSubject.create();
    final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

    final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
    sums.connect();

    subject.onNext(2);
    subject.onNext(3);

    sums.subscribe(subscriber);

//        subscriber.assertValueCount(1);
    subscriber.assertValues(6);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void testFlakyReplayBehavior() {
    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<Integer> subject = TestSubject.create(scheduler);
    final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

    final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
    sums.connect();

    subject.onNext(2);
    subject.onNext(3);
    scheduler.triggerActions();

    sums.subscribe(subscriber);

//        subscriber.assertValueCount(1);
    subscriber.assertValues(6);
  }

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void testExpectedReplayBehavior() {
  final TestScheduler scheduler = new TestScheduler();
  final TestSubject<Integer> subject = TestSubject.create(scheduler);
  final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
  final ConnectableObservable<Integer> sums = subject.scan((a, b) -> a + b).replay(1);
  sums.connect();
  subject.onNext(1);
  subject.onNext(2);
  subject.onNext(3);
  scheduler.triggerActions();
  sums.subscribe(subscriber);
  subscriber.assertValueCount(1);
  subscriber.assertValues(6);
}

相关文章

Observable类方法