io.reactivex.Flowable.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(192)

本文整理了Java中io.reactivex.Flowable.startWith()方法的一些代码示例,展示了Flowable.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.startWith()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:startWith

Flowable.startWith介绍

[英]Returns a Flowable that emits the items in a specified Iterable before it begins to emit items emitted by the source Publisher.

Backpressure: The operator honors backpressure from downstream. The source Publisheris expected to honor backpressure as well. If it violates this rule, it may throw an IllegalStateException when the source Publisher completes. Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个FlowTable,该FlowTable在开始发出源发布服务器发出的项之前发出指定Iterable中的项。
背压:操作员接受来自下游的背压。这位消息人士预计也会接受背压。如果违反此规则,则在源发布服务器完成时,它可能抛出非法状态异常。Scheduler:startWith在默认情况下不在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Flowable<Integer> w) {
    return w.startWith(indicator);
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Flowable<Integer> w) {
    return w.startWith(indicator)
        .doOnComplete(new Action() {
          @Override
          public void run() {
            System.out.println("inner done: " + wip.incrementAndGet());
          }
        })
        ;
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithPublisherNull() {
  just1.startWith((Publisher<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithSingleNull() {
  just1.startWith((Integer)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableNull() {
  just1.startWith((Iterable<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithFlowableNull() {
  just1.startWith((Flowable<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableOneNull() {
  just1.startWith(Arrays.asList(1, null)).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableIteratorNull() {
  just1.startWith(new Iterable<Integer>() {
    @Override
    public Iterator<Integer> iterator() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
    return t1.map(new Function<Throwable, Integer>() {
      @Override
      public Integer apply(Throwable t1) {
        return 0;
      }
    }).startWith(0).cast(Object.class);
  }
}).subscribe(subscriber);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Movie> apply(Flowable<List<Movie>> movieList) {
    return movieList
      .startWith(new ArrayList<Movie>())
      .buffer(2, 1)
      .skip(1)
      .flatMap(calculateDelta);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWithIterable() {
  List<String> li = new ArrayList<String>();
  li.add("alpha");
  li.add("beta");
  List<String> values = Flowable.just("one", "two").startWith(li).toList().blockingGet();
  assertEquals("alpha", values.get(0));
  assertEquals("beta", values.get(1));
  assertEquals("one", values.get(2));
  assertEquals("two", values.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStartWithWithScheduler() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Integer> flowable = Flowable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
  Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(subscriber);
  inOrder.verify(subscriber, times(1)).onNext(1);
  inOrder.verify(subscriber, times(1)).onNext(2);
  inOrder.verify(subscriber, times(1)).onNext(3);
  inOrder.verify(subscriber, times(1)).onNext(4);
  inOrder.verify(subscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWithObservable() {
  List<String> li = new ArrayList<String>();
  li.add("alpha");
  li.add("beta");
  List<String> values = Flowable.just("one", "two")
      .startWith(Flowable.fromIterable(li))
      .toList()
      .blockingGet();
  assertEquals("alpha", values.get(0));
  assertEquals("beta", values.get(1));
  assertEquals("one", values.get(2));
  assertEquals("two", values.get(3));
}

代码示例来源:origin: TeamNewPipe/NewPipe

/**
 * Initializes the play queue message buses.
 *
 * Also starts a self reporter for logging if debug mode is enabled.
 * */
public void init() {
  eventBroadcast = BehaviorSubject.create();
  broadcastReceiver = eventBroadcast.toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(AndroidSchedulers.mainThread())
      .startWith(new InitEvent());
  if (DEBUG) broadcastReceiver.subscribe(getSelfReporter());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Flowable which first delivers the events
 * of the other Publisher then runs this Completable.
 * <p>
 * <img width="640" height="250" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.startWith.p.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
 *  and expects the other {@code Publisher} to honor it as well.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param other the other Publisher to run first
 * @return the new Flowable instance
 * @throws NullPointerException if other is null
 */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> startWith(Publisher<T> other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return this.<T>toFlowable().startWith(other);
}

代码示例来源:origin: redisson/redisson

/**
 * Returns a Flowable which first delivers the events
 * of the other Publisher then runs this Completable.
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
 *  and expects the other {@code Publisher} to honor it as well.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * @param <T> the value type
 * @param other the other Publisher to run first
 * @return the new Flowable instance
 * @throws NullPointerException if other is null
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> startWith(Publisher<T> other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return this.<T>toFlowable().startWith(other);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorInParentFlowable() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.mergeDelayError(
      Flowable.just(Flowable.just(1), Flowable.just(2))
          .startWith(Flowable.<Integer> error(new RuntimeException()))
      ).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertTerminated();
  ts.assertValues(1, 2);
  assertEquals(1, ts.errorCount());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorDelayed2() {
  Flowable.combineLatestDelayError(
      new Function<Object[], Object>() {
        @Override
        public Object apply(Object[] a) throws Exception {
          return a;
        }
      },
      128,
      Flowable.error(new TestException()).startWith(1),
      Flowable.empty()
  )
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
 final String service = Context.CONNECTIVITY_SERVICE;
 final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
 networkCallback = createNetworkCallback(context);
 registerIdleReceiver(context);
 final NetworkRequest request =
   new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
     .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
     .build();
 manager.registerNetworkCallback(request, networkCallback);
 return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
  @Override public void run() {
   tryToUnregisterCallback(manager);
   tryToUnregisterReceiver(context);
  }
 }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> FlowableTransformer<T, Void> composeIfNonEmptyRx(Function<? super Flowable<T>, ? extends Flowable<Void>> f) {
  return g ->
    g.publish(h -> 
      h.limit(1).concatMap(first -> f.apply(h.startWith(first)))
    ).ignoreElements().toFlowable();
}

相关文章

Flowable类方法