io.reactivex.Observable.observeOn()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(289)

本文整理了Java中io.reactivex.Observable.observeOn()方法的一些代码示例,展示了Observable.observeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.observeOn()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:observeOn

Observable.observeOn介绍

[英]Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer with Flowable#bufferSize() "island size".

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous. If strict event ordering is required, consider using the #observeOn(Scheduler,boolean) overload.

Scheduler: You specify which Scheduler this operator will use.

"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed on the other side of the asynchronous boundary.
[中]修改ObservableSource以在指定的计划程序上执行其发射和通知,并使用具有可流动#bufferSize()“island size”的无限缓冲区异步执行。
请注意,如果调度程序是真正异步的,OneError通知将在发出线程的onNext通知之前切断。如果需要严格的事件排序,可以考虑使用超额观察(调度器,布尔)过载。
调度器:指定该操作员将使用的调度器。
“孤岛大小”表示无限缓冲区分配了多大的块来存储异步边界另一侧等待消耗的多余元素。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.observeOn(new TestScheduler());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<R> apply(Observable<T> t) throws Exception {
    ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
    return Observable.wrap(apply).observeOn(scheduler);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v)
      throws Exception {
    return Observable.just(2).hide()
    .observeOn(Schedulers.single())
    .map(new Function<Integer, Object>() {
      @Override
      public Object apply(Integer w) throws Exception {
        return Thread.currentThread().getName();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

/**
 * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling.
 */
@Test
public void testObserveOn() {
  Observer<Integer> observer = TestHelper.mockObserver();
  Observable.just(1, 2, 3).observeOn(ImmediateThinScheduler.INSTANCE).subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, times(1)).onNext(3);
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void testMapWithIssue417() {
  Observable.just(1).observeOn(Schedulers.computation())
      .map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer arg0) {
          throw new IllegalArgumentException("any error");
        }
      }).blockingSingle();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() {
  TestScheduler testScheduler = new TestScheduler();
  Observable<Integer> source = Observable.concat(Observable.<Integer> error(new TestException()), Observable.just(1));
  Observer<Integer> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.observeOn(testScheduler).subscribe(o);
  inOrder.verify(o, never()).onError(any(TestException.class));
  testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  inOrder.verify(o).onError(any(TestException.class));
  inOrder.verify(o, never()).onNext(anyInt());
  inOrder.verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  TestObserver<Integer> to = new TestObserverFusedCanceling();
  Observable.just(1).hide().observeOn(s).subscribe(to);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsyncChild() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(0, 100000).observeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void trampolineScheduler() {
  Observable.just(1)
  .observeOn(Schedulers.trampoline())
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelySyncInNormalOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  Observable.concat(
      Observable.just(1).observeOn(s),
      Observable.just(2)
  )
  .test()
  .assertResult(1, 2);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelyNormalInNormalOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  Observable.concat(
      Observable.just(1).hide().observeOn(s),
      Observable.just(2)
  )
  .test()
  .assertResult(1, 2);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure1() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100000).takeLast(1)
  .observeOn(Schedulers.newThread())
  .map(newSlowProcessor()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  to.assertValue(100000);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsynchronousRun() {
  Observable.range(1, 2).concatMapEager(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t) {
      return Observable.range(1, 1000).subscribeOn(Schedulers.computation());
    }
  }).observeOn(Schedulers.newThread()).subscribe(to);
  to.awaitTerminalEvent(5, TimeUnit.SECONDS);
  to.assertNoErrors();
  to.assertValueCount(2000);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void asyncFused() {
  Observable.just(1).hide()
  .switchMap(Functions.justFunction(
      Observable.range(1, 5)
      .observeOn(ImmediateThinScheduler.INSTANCE)
  ))
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void outputFused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.range(1, 5).hide()
  .observeOn(Schedulers.single())
  .subscribe(to);
  ObserverFusion.assertFusion(to, QueueFuseable.ASYNC)
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void outputFusedReject() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.SYNC);
  Observable.range(1, 5).hide()
  .observeOn(Schedulers.single())
  .subscribe(to);
  ObserverFusion.assertFusion(to, QueueFuseable.NONE)
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void inputSyncFused() {
  Observable.range(1, 5)
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, to.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipLastWithBackpressure() {
  Observable<Integer> o = Observable.range(0, Flowable.bufferSize() * 2).skipLast(Flowable.bufferSize() + 10);
  TestObserver<Integer> to = new TestObserver<Integer>();
  o.observeOn(Schedulers.computation()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  assertEquals((Flowable.bufferSize()) - 10, to.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void observeOn() {
  Observable.range(1, 1000)
  .takeLast(1, TimeUnit.DAYS)
  .take(500)
  .observeOn(Schedulers.single(), true, 1)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertSubscribed()
  .assertValueCount(500)
  .assertNoErrors()
  .assertComplete();
}

相关文章

Observable类方法