io.reactivex.Observable.unsafeCreate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(106)

本文整理了Java中io.reactivex.Observable.unsafeCreate()方法的一些代码示例,展示了Observable.unsafeCreate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.unsafeCreate()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:unsafeCreate

Observable.unsafeCreate介绍

[英]Create an Observable by wrapping an ObservableSource which has to be implemented according to the Reactive-Streams-based Observable specification by handling cancellation correctly; no safeguards are provided by the Observable itself. Scheduler: unsafeCreate by default doesn't operate on any particular Scheduler.
[中]通过包装ObservableSource创建一个Observable,该ObservableSource必须根据基于反应流的Observable规范通过正确处理取消来实现;可观察物体本身不提供任何保护措施*。调度程序:默认情况下,未完成处理不会在任何特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Disposable subscription) {
    return Observable.unsafeCreate(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> t1) {
        throw new TestException();
      }
    });
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Object opening) {
    return Observable.unsafeCreate(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> observer) {
        observer.onSubscribe(Disposables.empty());
        push(observer, new Object(), 100);
        complete(observer, 101);
      }
    });
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Object opening) {
    return Observable.unsafeCreate(new ObservableSource<Object>() {
      @Override
      public void subscribe(Observer<? super Object> innerObserver) {
        innerObserver.onSubscribe(Disposables.empty());
        push(innerObserver, new Object(), 100);
        complete(innerObserver, 101);
      }
    });
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void subscribe(Observer<? super Integer> observer1) {
    Observable.unsafeCreate(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> observer2) {
        throw new IllegalArgumentException("original exception");
      }
    }).subscribe(observer1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void subscribe(Observer<? super Integer> observer1) {
    Observable.unsafeCreate(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> observer2) {
        throw new IllegalArgumentException("original exception");
      }
    }).subscribe(observer1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void unsafeWithObservable() {
  Observable.unsafeCreate(Observable.just(1));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void createNull() {
  Observable.unsafeCreate(null);
}

代码示例来源:origin: ReactiveX/RxJava

private static Observable<String> infinite(final AtomicInteger produced) {
  return Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      Disposable bs = Disposables.empty();
      observer.onSubscribe(bs);
      while (!bs.isDisposed()) {
        observer.onNext("onNext");
        produced.incrementAndGet();
      }
    }
  }).subscribeOn(Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeList() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
  listOfObservables.add(o1);
  listOfObservables.add(o2);
  Observable<String> m = Observable.merge(listOfObservables);
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(1)).onComplete();
  verify(stringObserver, times(2)).onNext("hello");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeArray() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  Observable<String> m = Observable.merge(o1, o2);
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(2)).onNext("hello");
  verify(stringObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeArray() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  Observable<String> m = Observable.mergeDelayError(o1, o2);
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(2)).onNext("hello");
  verify(stringObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeList() {
  final Observable<String> o1 = Observable.unsafeCreate(new TestSynchronousObservable());
  final Observable<String> o2 = Observable.unsafeCreate(new TestSynchronousObservable());
  List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
  listOfObservables.add(o1);
  listOfObservables.add(o2);
  Observable<String> m = Observable.mergeDelayError(Observable.fromIterable(listOfObservables));
  m.subscribe(stringObserver);
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(1)).onComplete();
  verify(stringObserver, times(2)).onNext("hello");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOriginFails() {
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(1));
  origin.subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onNext("beginningEveryTime");
  inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
  inOrder.verify(observer, never()).onNext("onSuccessOnly");
  inOrder.verify(observer, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void subscribe(Observer<? super Observable<String>> op) {
    op.onSubscribe(Disposables.empty());
    op.onNext(Observable.unsafeCreate(o1));
    op.onNext(Observable.unsafeCreate(o2));
    op.onError(new NullPointerException("throwing exception in parent"));
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
public void subscribe(Observer<? super Observable<String>> observer) {
  observer.onSubscribe(Disposables.empty());
  // simulate what would happen in an Observable
  observer.onNext(Observable.unsafeCreate(w1));
  observer.onNext(Observable.unsafeCreate(w2));
  observer.onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeArrayWithThreading() {
  final TestASynchronousObservable o1 = new TestASynchronousObservable();
  final TestASynchronousObservable o2 = new TestASynchronousObservable();
  Observable<String> m = Observable.merge(Observable.unsafeCreate(o1), Observable.unsafeCreate(o2));
  TestObserver<String> to = new TestObserver<String>(stringObserver);
  m.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  verify(stringObserver, never()).onError(any(Throwable.class));
  verify(stringObserver, times(2)).onNext("hello");
  verify(stringObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnError() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      observer.onSubscribe(Disposables.empty());
      observer.onError(new RuntimeException("fail"));
    }
  }).subscribeOn(Schedulers.computation()).subscribe(to);
  to.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
  to.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
  final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
  Observable<Notification<String>> m = Observable.unsafeCreate(o).materialize();
  assertEquals(3, m.toList().toFuture().get().size());
  assertEquals(3, m.toList().toFuture().get().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("ObservableSource.subscribe can't throw")
public void testThrownErrorHandling() {
  TestObserver<String> to = new TestObserver<String>();
  Observable.unsafeCreate(new ObservableSource<String>() {
    @Override
    public void subscribe(Observer<? super String> observer) {
      throw new RuntimeException("fail");
    }
  }).subscribeOn(Schedulers.computation()).subscribe(to);
  to.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
  to.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testSwitchShouldTriggerUnsubscribe() {
    final Disposable d = Disposables.empty();

    Observable.unsafeCreate(new ObservableSource<Long>() {
      @Override
      public void subscribe(final Observer<? super Long> observer) {
        observer.onSubscribe(d);
        observer.onComplete();
      }
    }).switchIfEmpty(Observable.<Long>never()).subscribe();
    assertTrue(d.isDisposed());
  }
}

相关文章

Observable类方法