io.reactivex.Observable.serialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中io.reactivex.Observable.serialize()方法的一些代码示例,展示了Observable.serialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.serialize()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:serialize

Observable.serialize介绍

[英]Forces an ObservableSource's emissions and notifications to be serialized and for it to obey the ObservableSource contract in other ways.

It is possible for an ObservableSource to invoke its Observers' methods asynchronously, perhaps from different threads. This could make such an ObservableSource poorly-behaved, in that it might try to invoke onComplete or onError before one of its onNext invocations, or it might call onNext from two different threads concurrently. You can force such an ObservableSource to be well-behaved and sequential by applying the serialize method to it.

Scheduler: serialize does not operate by default on a particular Scheduler.
[中]强制可观察资源的排放和通知序列化,并以其他方式服从the ObservableSource contract
ObservateSource可以异步调用其观察者的方法,可能来自不同的线程。这可能会使这样一个ObservateSource表现不佳,因为它可能会尝试在其onNext调用之一之前调用onComplete或onError,或者可能会同时从两个不同的线程调用onNext。您可以通过将serialize方法应用于这样一个ObservateSource,来强制它保持良好的行为和顺序。
调度程序:serialize默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleThreadedBasic() {
  TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable("one", "two", "three");
  Observable<String> w = Observable.unsafeCreate(onSubscribe);
  w.serialize().subscribe(observer);
  onSubscribe.waitToFinish();
  verify(observer, times(1)).onNext("one");
  verify(observer, times(1)).onNext("two");
  verify(observer, times(1)).onNext("three");
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
  // non-deterministic because unsubscribe happens after 'waitToFinish' releases
  // so commenting out for now as this is not a critical thing to test here
  //            verify(s, times(1)).unsubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultiThreadedWithNPEinMiddle() {
  boolean lessThan9 = false;
  for (int i = 0; i < 3; i++) {
    TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine");
    Observable<String> w = Observable.unsafeCreate(onSubscribe);
    BusyObserver busyobserver = new BusyObserver();
    w.serialize().subscribe(busyobserver);
    onSubscribe.waitToFinish();
    System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
    // this should not always be the full number of items since the error should (very often)
    // stop it before it completes all 9
    System.out.println("onNext count: " + busyobserver.onNextCount.get());
    if (busyobserver.onNextCount.get() < 9) {
      lessThan9 = true;
    }
    assertTrue(busyobserver.onError);
    // no onComplete because onError was invoked
    assertFalse(busyobserver.onComplete);
    // non-deterministic because unsubscribe happens after 'waitToFinish' releases
    // so commenting out for now as this is not a critical thing to test here
    // verify(s, times(1)).unsubscribe();
    // we can have concurrency ...
    int n = onSubscribe.maxConcurrentThreads.get();
    assertTrue("" + n, n > 1);
    // ... but the onNext execution should be single threaded
    assertEquals(1, busyobserver.maxConcurrentThreads.get());
  }
  assertTrue(lessThan9);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultiThreadedWithNPE() {
  TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three", null);
  Observable<String> w = Observable.unsafeCreate(onSubscribe);
  BusyObserver busyobserver = new BusyObserver();
  w.serialize().subscribe(busyobserver);
  onSubscribe.waitToFinish();
  System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
  // we can't know how many onNext calls will occur since they each run on a separate thread
  // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
  // assertEquals(3, busyobserver.onNextCount.get());
  assertTrue(busyobserver.onNextCount.get() < 4);
  assertTrue(busyobserver.onError);
  // no onComplete because onError was invoked
  assertFalse(busyobserver.onComplete);
  // non-deterministic because unsubscribe happens after 'waitToFinish' releases
  // so commenting out for now as this is not a critical thing to test here
  //verify(s, times(1)).unsubscribe();
  // we can have concurrency ...
  assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
  // ... but the onNext execution should be single threaded
  assertEquals(1, busyobserver.maxConcurrentThreads.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultiThreadedBasic() {
  TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable("one", "two", "three");
  Observable<String> w = Observable.unsafeCreate(onSubscribe);
  BusyObserver busyobserver = new BusyObserver();
  w.serialize().subscribe(busyobserver);
  onSubscribe.waitToFinish();
  assertEquals(3, busyobserver.onNextCount.get());
  assertFalse(busyobserver.onError);
  assertTrue(busyobserver.onComplete);
  // non-deterministic because unsubscribe happens after 'waitToFinish' releases
  // so commenting out for now as this is not a critical thing to test here
  //            verify(s, times(1)).unsubscribe();
  // we can have concurrency ...
  assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
  // ... but the onNext execution should be single threaded
  assertEquals(1, busyobserver.maxConcurrentThreads.get());
}

代码示例来源:origin: imZeJun/RxSample

mRefreshing.set(false);
}).subscribeOn(Schedulers.io()).serialize();
mPublishProcessor = PublishProcessor.create();
mTokenFlow = Flowable.create(new FlowableOnSubscribe<String>() {

相关文章

Observable类方法