io.reactivex.Observable.buffer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中io.reactivex.Observable.buffer()方法的一些代码示例,展示了Observable.buffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.buffer()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:buffer

Observable.buffer介绍

[英]Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting ObservableSource emits connected, non-overlapping buffers, each containing count items. When the source ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. Note that if the source ObservableSource issues an onError notification the event is passed on immediately without first emitting the buffer it is in the process of assembling.

Scheduler: This version of buffer does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出它从源ObservableSource收集的项的缓冲区。由此产生的ObservableSource发出连接的、不重叠的缓冲区,每个缓冲区都包含计数项。当源ObservableSource完成时,生成的ObservableSource将发出当前缓冲区,并传播来自源ObservableSource的通知。请注意,如果源ObservableSource发出OneError通知,则事件会立即传递,而不会首先发出缓冲区,因为它正处于组装过程中。
调度程序:默认情况下,此版本的缓冲区不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<List<Object>> apply(Observable<Object> o)
      throws Exception {
    return o.buffer(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<List<Object>> apply(Observable<Object> f)
      throws Exception {
    return f.buffer(2, TimeUnit.SECONDS, 10);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<List<Object>> apply(Observable<Object> o)
      throws Exception {
    return o.buffer(1, 2);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<List<Object>> apply(Observable<Object> f)
      throws Exception {
    return f.buffer(2, 1, TimeUnit.SECONDS);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<List<Object>> apply(Observable<Object> f)
      throws Exception {
    return f.buffer(Observable.never());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferBoundarySupplier2ReturnsNull() {
  just1.buffer(new Callable<Observable<Object>>() {
    @Override
    public Observable<Object> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountBuffersWithGaps() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five");
  Observable<List<String>> buffered = source.buffer(2, 3);
  buffered.subscribe(observer);
  InOrder inOrder = Mockito.inOrder(observer);
  inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two"));
  inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five"));
  inOrder.verify(observer, Mockito.never()).onNext(Mockito.<String>anyList());
  inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class));
  inOrder.verify(observer, Mockito.times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferOpenCloseCloseReturnsNull() {
  just1.buffer(just1, new Function<Integer, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testComplete() {
  Observable<String> source = Observable.empty();
  Observable<List<String>> buffered = source.buffer(3, 3);
  buffered.subscribe(observer);
  Mockito.verify(observer, Mockito.never()).onNext(Mockito.<String>anyList());
  Mockito.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class));
  Mockito.verify(observer, Mockito.times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferSupplierReturnsNull() {
  just1.buffer(1, 1, new Callable<Collection<Integer>>() {
    @Override
    public Collection<Integer> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountGaplessBuffers() {
  Observable<String> source = Observable.just("one", "two", "three", "four", "five");
  Observable<List<String>> buffered = source.buffer(3, 3);
  buffered.subscribe(observer);
  InOrder inOrder = Mockito.inOrder(observer);
  inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two", "three"));
  inOrder.verify(observer, Mockito.times(1)).onNext(list("four", "five"));
  inOrder.verify(observer, Mockito.never()).onNext(Mockito.<String>anyList());
  inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class));
  inOrder.verify(observer, Mockito.times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeTake1() {
  Observable<Integer> source = Observable.just(1).repeat();
  Observable<List<Integer>> result = source.buffer(2).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(Arrays.asList(1, 1));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Movie> apply(Observable<List<Movie>> movieList) {
    return movieList
      .startWith(new ArrayList<Movie>())
      .buffer(2, 1)
      .skip(1)
      .flatMap(calculateDelta);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithTimeTake1() {
  Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Observable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  verify(o).onNext(Arrays.asList(0L, 1L));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferTimedSupplierReturnsNull() {
  just1.buffer(1L, 1L, TimeUnit.SECONDS, Schedulers.single(), new Callable<Collection<Integer>>() {
    @Override
    public Collection<Integer> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedOverlapEmpty() {
  Observable.empty()
  .buffer(2, 1, TimeUnit.DAYS)
  .test()
  .assertResult(Collections.emptyList());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimeSkipDefault() {
  Observable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
  .test()
  .assertResult(Arrays.asList(1, 2, 3, 4, 5));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void restartTimer() {
  Observable.range(1, 5)
  .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true)
  .test()
  .assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedExactError() {
  Observable.error(new TestException())
  .buffer(1, TimeUnit.DAYS)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timedCancelledUpfront() {
  TestScheduler sch = new TestScheduler();
  TestObserver<List<Object>> to = Observable.never()
  .buffer(1, TimeUnit.MILLISECONDS, sch)
  .test(true);
  sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
  to.assertEmpty();
}

相关文章

Observable类方法