io.reactivex.Observable.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中io.reactivex.Observable.startWith()方法的一些代码示例,展示了Observable.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:startWith

Observable.startWith介绍

[英]Returns an Observable that emits the items in a specified ObservableSource before it begins to emit items emitted by the source ObservableSource.

Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个可观察对象,该对象在开始发射源可观察资源发射的项之前发射指定可观察资源中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Integer> apply(Observable<Integer> w) {
  3. return w.startWith(indicator);
  4. }
  5. }).subscribe(to);

代码示例来源:origin: Polidea/RxAndroidBle

  1. public static <T> Observable<T> justOnNext(T onNext) {
  2. return Observable.<T>never().startWith(onNext);
  3. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Integer> apply(Observable<Integer> w) {
  3. return w.startWith(indicator)
  4. .doOnComplete(new Action() {
  5. @Override
  6. public void run() {
  7. System.out.println("inner done: " + wip.incrementAndGet());
  8. }
  9. })
  10. ;
  11. }
  12. })

代码示例来源:origin: Polidea/RxAndroidBle

  1. private Observable<RxBleConnection> emitConnectionWithoutCompleting() {
  2. return Observable.<RxBleConnection>never().startWith(rxBleConnection);
  3. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Object> apply(Observable<? extends Throwable> t1) {
  3. return t1.map(new Function<Throwable, Integer>() {
  4. @Override
  5. public Integer apply(Throwable t1) {
  6. return 0;
  7. }
  8. }).startWith(0).cast(Object.class);
  9. }
  10. }).subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void startWithSingleNull() {
  3. just1.startWith((Integer)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void startWithObservableNull() {
  3. just1.startWith((Observable<Integer>)null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void startWithIterableNull() {
  3. just1.startWith((Iterable<Integer>)null);
  4. }

代码示例来源:origin: sockeqwe/mosby

  1. public Observable<String> intent1() {
  2. return Observable.just("Intent 1").startWith("Before Intent 1");
  3. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void startWithIterableOneNull() {
  3. just1.startWith(Arrays.asList(1, null)).blockingSubscribe();
  4. }

代码示例来源:origin: square/sqlbrite

  1. @CheckResult @NonNull
  2. private QueryObservable createQuery(DatabaseQuery query) {
  3. if (transactions.get() != null) {
  4. throw new IllegalStateException("Cannot create observable query in transaction. "
  5. + "Use query() for a query inside a transaction.");
  6. }
  7. return triggers //
  8. .filter(query) // DatabaseQuery filters triggers to on tables we care about.
  9. .map(query) // DatabaseQuery maps to itself to save an allocation.
  10. .startWith(query) //
  11. .observeOn(scheduler) //
  12. .compose(queryTransformer) // Apply the user's query transformer.
  13. .doOnSubscribe(ensureNotInTransaction)
  14. .to(QUERY_OBSERVABLE);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void startWithIterableIteratorNull() {
  3. just1.startWith(new Iterable<Integer>() {
  4. @Override
  5. public Iterator<Integer> iterator() {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void startWithIterable() {
  3. List<String> li = new ArrayList<String>();
  4. li.add("alpha");
  5. li.add("beta");
  6. List<String> values = Observable.just("one", "two").startWith(li).toList().blockingGet();
  7. assertEquals("alpha", values.get(0));
  8. assertEquals("beta", values.get(1));
  9. assertEquals("one", values.get(2));
  10. assertEquals("two", values.get(3));
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Movie> apply(Observable<List<Movie>> movieList) {
  3. return movieList
  4. .startWith(new ArrayList<Movie>())
  5. .buffer(2, 1)
  6. .skip(1)
  7. .flatMap(calculateDelta);
  8. }
  9. };

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testStartWithWithScheduler() {
  3. TestScheduler scheduler = new TestScheduler();
  4. Observable<Integer> o = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
  5. Observer<Integer> observer = TestHelper.mockObserver();
  6. o.subscribe(observer);
  7. scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
  8. InOrder inOrder = inOrder(observer);
  9. inOrder.verify(observer, times(1)).onNext(1);
  10. inOrder.verify(observer, times(1)).onNext(2);
  11. inOrder.verify(observer, times(1)).onNext(3);
  12. inOrder.verify(observer, times(1)).onNext(4);
  13. inOrder.verify(observer, times(1)).onComplete();
  14. inOrder.verifyNoMoreInteractions();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void startWithObservable() {
  3. List<String> li = new ArrayList<String>();
  4. li.add("alpha");
  5. li.add("beta");
  6. List<String> values = Observable.just("one", "two")
  7. .startWith(Observable.fromIterable(li))
  8. .toList()
  9. .blockingGet();
  10. assertEquals("alpha", values.get(0));
  11. assertEquals("beta", values.get(1));
  12. assertEquals("one", values.get(2));
  13. assertEquals("two", values.get(3));
  14. }

代码示例来源:origin: pwittchen/ReactiveNetwork

  1. @Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
  2. final String service = Context.CONNECTIVITY_SERVICE;
  3. final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
  4. return Observable.create(new ObservableOnSubscribe<Connectivity>() {
  5. @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
  6. networkCallback = createNetworkCallback(subscriber, context);
  7. final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
  8. manager.registerNetworkCallback(networkRequest, networkCallback);
  9. }
  10. }).doOnDispose(new Action() {
  11. @Override public void run() {
  12. tryToUnregisterCallback(manager);
  13. }
  14. }).startWith(Connectivity.create(context)).distinctUntilChanged();
  15. }

代码示例来源:origin: requery/requery

  1. static <T> Observable<ReactiveResult<T>> toObservableResult(final ReactiveResult<T> result) {
  2. final QueryElement<?> element = result.unwrapQuery();
  3. // ensure the transaction listener is added in the target data store
  4. result.addTransactionListener(typeChanges);
  5. return typeChanges.commitSubject()
  6. .filter(new Predicate<Set<Type<?>>>() {
  7. @Override
  8. public boolean test(Set<Type<?>> types) {
  9. return !Collections.disjoint(element.entityTypes(), types) ||
  10. Types.referencesType(element.entityTypes(), types);
  11. }
  12. }).map(new Function<Set<Type<?>>, ReactiveResult<T>>() {
  13. @Override
  14. public ReactiveResult<T> apply(Set<Type<?>> types) {
  15. return result;
  16. }
  17. }).startWith(result);
  18. }
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testErrorInParentObservable() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.mergeDelayError(
  5. Observable.just(Observable.just(1), Observable.just(2))
  6. .startWith(Observable.<Integer> error(new RuntimeException()))
  7. ).subscribe(to);
  8. to.awaitTerminalEvent();
  9. to.assertTerminated();
  10. to.assertValues(1, 2);
  11. assertEquals(1, to.errorCount());
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void errorDelayed2() {
  4. Observable.combineLatestDelayError(
  5. new Function<Object[], Object>() {
  6. @Override
  7. public Object apply(Object[] a) throws Exception {
  8. return a;
  9. }
  10. },
  11. 128,
  12. Observable.error(new TestException()).startWith(1),
  13. Observable.empty()
  14. )
  15. .test()
  16. .assertFailure(TestException.class);
  17. }

相关文章

Observable类方法