io.reactivex.Flowable.take()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(418)

本文整理了Java中io.reactivex.Flowable.take()方法的一些代码示例,展示了Flowable.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.take()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:take

Flowable.take介绍

[英]Returns a Flowable that emits only the first count items emitted by the source Publisher. If the source emits fewer than count items then all of its items are emitted.

This method returns a Publisher that will invoke a subscribing Subscriber's Subscriber#onNext function a maximum of count times before invoking Subscriber#onComplete. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior in case the first request is smaller than the count. Otherwise, the source Publisheris consumed in an unbounded manner (i.e., without applying backpressure to it). Scheduler: This version of take does not operate by default on a particular Scheduler.
[中]返回仅发出源发布服务器发出的第一个计数项的可流动项。如果源发射的项目少于计数,则发射其所有项目。
此方法返回一个发布服务器,在调用Subscriber#onComplete之前,该发布服务器将调用订阅订阅服务器的Subscriber#onNext函数的最大计数次数。背压:如果第一个请求小于计数,操作员不会干扰由源发布者的背压行为确定的背压。否则,源将以无限制的方式消耗(即,不向其施加背压)。调度程序:默认情况下,此版本的take不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Flowable<Integer> f) {
    return f.take(1);
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fromArray() {
  String[] items = new String[] { "one", "two", "three" };
  assertEquals((Long)3L, Flowable.fromArray(items).count().blockingGet());
  assertEquals("two", Flowable.fromArray(items).skip(1).take(1).blockingSingle());
  assertEquals("three", Flowable.fromArray(items).takeLast(1).blockingSingle());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Flowable<Integer> f) {
    return f.take(1);
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> handler) throws Exception {
    return handler.take(2);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(
      Flowable<Integer> w) throws Exception {
    return w.take(1);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testUnsubscribeFromSynchronousInfiniteFlowable() {
  final AtomicLong count = new AtomicLong();
  INFINITE_OBSERVABLE.take(10).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long l) {
      count.set(l);
    }
  });
  assertEquals(10, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake1() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  Flowable<String> take = w.take(2);
  Subscriber<String> subscriber = TestHelper.mockSubscriber();
  take.subscribe(subscriber);
  verify(subscriber, times(1)).onNext("one");
  verify(subscriber, times(1)).onNext("two");
  verify(subscriber, never()).onNext("three");
  verify(subscriber, never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeFirstWithPredicateOfSome() {
  Flowable<Integer> flowable = Flowable.just(1, 3, 5, 4, 6, 3);
  flowable.filter(IS_EVEN).take(1).subscribe(w);
  verify(w, times(1)).onNext(anyInt());
  verify(w).onNext(4);
  verify(w, times(1)).onComplete();
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testWrappingMockWhenUnsubscribeInvolved() {
  Flowable<Integer> oi = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)).take(2);
  Subscriber<Integer> mockSubscriber = TestHelper.mockSubscriber();
  oi.subscribe(new TestSubscriber<Integer>(mockSubscriber));
  InOrder inOrder = inOrder(mockSubscriber);
  inOrder.verify(mockSubscriber, times(1)).onNext(1);
  inOrder.verify(mockSubscriber, times(1)).onNext(2);
  inOrder.verify(mockSubscriber, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Flowable.range(1, 3)
  .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1)
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Flowable.intervalRange(1, 2, 1, 1, TimeUnit.MILLISECONDS)
  .take(1)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Maybe.just(1).flattenAsFlowable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Flowable.range(1, 5)
  .mergeWith(Single.just(100))
  .take(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void conditionalSlowPathTakeExact() {
  Flowable.range(1, 5)
  .filter(Functions.alwaysTrue())
  .take(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Flowable.range(1, 5)
  .mergeWith(Maybe.just(100))
  .take(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void slowPathTakeExact() {
  Flowable.rangeLong(1L, 5L)
  .filter(Functions.alwaysTrue())
  .take(5)
  .test()
  .assertResult(1L, 2L, 3L, 4L, 5L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeOne() {
  Flowable.create(source, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
  ts.request(2);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeHalf() {
  int elements = 1024;
  Flowable.range(0, elements * 2).unsubscribeOn(Schedulers.single())
  .take(elements)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertValueCount(elements)
  .assertComplete()
  .assertNoErrors()
  .assertSubscribed();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalConditionalLong() {
  Flowable.fromIterable(new CrashingIterable(100, 10 * 1000 * 1000, 10 * 1000 * 1000))
  .filter(Functions.alwaysTrue())
  .take(1000 * 1000)
  .test()
  .assertSubscribed()
  .assertValueCount(1000 * 1000)
  .assertNoErrors()
  .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorFallbackTake() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  TestSubscriber<Integer> ts = pp
  .timeout(Functions.justFunction(Flowable.never()), Flowable.just(2))
  .take(1)
  .test();
  assertTrue(pp.hasSubscribers());
  pp.onNext(1);
  assertFalse(pp.hasSubscribers());
  ts.assertResult(1);
}

相关文章

Flowable类方法