io.reactivex.Observable.takeLast()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中io.reactivex.Observable.takeLast()方法的一些代码示例,展示了Observable.takeLast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeLast()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:takeLast

Observable.takeLast介绍

[英]Returns an Observable that emits at most the last count items emitted by the source ObservableSource. If the source emits fewer than count items then all of its items are emitted.

Scheduler: This version of takeLast does not operate by default on a particular Scheduler.
[中]返回最多发射源ObservableSource发射的最后计数项的Observable。如果源发射的项目少于计数,则发射其所有项目。
调度程序:默认情况下,此版本的takeLast不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> f) throws Exception {
  3. return f.takeLast(1);
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
  3. return o.takeLast(5);
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeLastWithZeroCount() {
  3. Observable<String> w = Observable.just("one");
  4. Observable<String> take = w.takeLast(0);
  5. Observer<String> observer = TestHelper.mockObserver();
  6. take.subscribe(observer);
  7. verify(observer, never()).onNext("one");
  8. verify(observer, never()).onError(any(Throwable.class));
  9. verify(observer, times(1)).onComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTakeLast2() {
  3. Observable<String> w = Observable.just("one");
  4. Observable<String> take = w.takeLast(10);
  5. Observer<String> observer = TestHelper.mockObserver();
  6. take.subscribe(observer);
  7. verify(observer, times(1)).onNext("one");
  8. verify(observer, never()).onError(any(Throwable.class));
  9. verify(observer, times(1)).onComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Observable<Observable<Object>> apply(Observable<Object> f)
  3. throws Exception {
  4. return f.window(Observable.never()).takeLast(1);
  5. }
  6. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTime() {
  3. Observable.just(1, 2)
  4. .takeLast(1, TimeUnit.MINUTES)
  5. .test()
  6. .assertResult(1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTimeAndSize() {
  3. Observable.just(1, 2)
  4. .takeLast(1, 1, TimeUnit.MINUTES)
  5. .test()
  6. .assertResult(2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testLastOfOneReturnsLast() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.just(1).takeLast(1).subscribe(to);
  5. to.assertValue(1);
  6. to.assertNoErrors();
  7. to.assertTerminated();
  8. // NO longer assertable
  9. // s.assertUnsubscribed();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTimeDelayErrorCustomScheduler() {
  3. Observable.just(1, 2).concatWith(Observable.<Integer>error(new TestException()))
  4. .takeLast(1, TimeUnit.MINUTES, Schedulers.io(), true)
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure1() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 100000).takeLast(1)
  5. .observeOn(Schedulers.newThread())
  6. .map(newSlowProcessor()).subscribe(to);
  7. to.awaitTerminalEvent();
  8. to.assertNoErrors();
  9. to.assertValue(100000);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromArityArgs1() {
  3. Observable<String> items = Observable.just("one");
  4. assertEquals((Long)1L, items.count().blockingGet());
  5. assertEquals("one", items.takeLast(1).blockingSingle());
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTimeDelayError() {
  3. Observable.just(1, 2).concatWith(Observable.<Integer>error(new TestException()))
  4. .takeLast(1, TimeUnit.MINUTES, true)
  5. .test()
  6. .assertFailure(TestException.class, 1, 2);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testLastOfManyReturnsLast() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 10).takeLast(1).subscribe(to);
  5. to.assertValue(10);
  6. to.assertNoErrors();
  7. to.assertTerminated();
  8. // NO longer assertable
  9. // s.assertUnsubscribed();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Observable.error(new TestException())
  4. .takeLast(5)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromIterable() {
  3. ArrayList<String> items = new ArrayList<String>();
  4. items.add("one");
  5. items.add("two");
  6. items.add("three");
  7. assertEquals((Long)3L, Observable.fromIterable(items).count().blockingGet());
  8. assertEquals("two", Observable.fromIterable(items).skip(1).take(1).blockingSingle());
  9. assertEquals("three", Observable.fromIterable(items).takeLast(1).blockingSingle());
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Observable.error(new TestException())
  4. .takeLast(1)
  5. .test()
  6. .assertFailure(TestException.class);
  7. }
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testBackpressure2() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  5. .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(to);
  6. to.awaitTerminalEvent();
  7. to.assertNoErrors();
  8. assertEquals(Flowable.bufferSize() * 4, to.valueCount());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeLastTake() {
  3. Observable.range(1, 10)
  4. .takeLast(5)
  5. .take(2)
  6. .test()
  7. .assertResult(6, 7);
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromArityArgs3() {
  3. Observable<String> items = Observable.just("one", "two", "three");
  4. assertEquals((Long)3L, items.count().blockingGet());
  5. assertEquals("two", items.skip(1).take(1).blockingSingle());
  6. assertEquals("three", items.takeLast(1).blockingSingle());
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromArray() {
  3. String[] items = new String[] { "one", "two", "three" };
  4. assertEquals((Long)3L, Observable.fromArray(items).count().blockingGet());
  5. assertEquals("two", Observable.fromArray(items).skip(1).take(1).blockingSingle());
  6. assertEquals("three", Observable.fromArray(items).takeLast(1).blockingSingle());
  7. }

相关文章

Observable类方法