io.reactivex.Flowable.buffer()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(199)

本文整理了Java中io.reactivex.Flowable.buffer()方法的一些代码示例,展示了Flowable.buffer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.buffer()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:buffer

Flowable.buffer介绍

[英]Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting Publisher emits connected, non-overlapping buffers, each containing count items. When the source Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the source Publisher. Note that if the source Publisher issues an onError notification the event is passed on immediately without first emitting the buffer it is in the process of assembling.

Backpressure: The operator honors backpressure from downstream and expects the source Publisher to honor it as well, although not enforced; violation may lead to MissingBackpressureException somewhere downstream. Scheduler: This version of buffer does not operate by default on a particular Scheduler.
[中]返回从源发布服务器收集的项的缓冲区。生成的发布服务器发出连接的、不重叠的缓冲区,每个缓冲区包含计数项。源发布服务器完成后,生成的发布服务器将发出当前缓冲区并从源发布服务器传播通知。请注意,如果源发布服务器发出onError通知,则事件将立即传递,而不会首先发出缓冲区,因为它正处于组装过程中。
背压:操作员接受来自下游的背压,并期望源发布者也接受它,尽管没有强制执行;违规可能导致下游某个地方出现背压异常。调度程序:默认情况下,此版本的缓冲区不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return f.buffer(1, 2);
  }
}, false, 1, 1, Arrays.asList(1));

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<List<Object>> apply(Flowable<Object> f) throws Exception {
    return f.buffer(1, 2);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Integer> f) throws Exception {
    return f.buffer(2, 1);
  }
}, false, 1, 1, Arrays.asList(1));

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<List<Object>> apply(Flowable<Object> f)
      throws Exception {
    return f.buffer(1, TimeUnit.SECONDS);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferOpenCloseOpenNull() {
  just1.buffer(null, new Function<Object, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Object o) {
      return just1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<List<Object>> apply(Flowable<Object> f)
      throws Exception {
    return f.buffer(Flowable.never());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferBoundarySupplier2SupplierNull() {
  just1.buffer(new Callable<Flowable<Integer>>() {
    @Override
    public Flowable<Integer> call() {
      return just1;
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferBoundarySupplierReturnsNull() {
  just1.buffer(just1, new Callable<Collection<Integer>>() {
    @Override
    public Collection<Integer> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void bufferOpenCloseCloseReturnsNull() {
  just1.buffer(just1, new Function<Integer, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Movie> apply(Flowable<List<Movie>> movieList) {
    return movieList
      .startWith(new ArrayList<Movie>())
      .buffer(2, 1)
      .skip(1)
      .flatMap(calculateDelta);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountBuffersWithGaps() {
  Flowable<String> source = Flowable.just("one", "two", "three", "four", "five");
  Flowable<List<String>> buffered = source.buffer(2, 3);
  buffered.subscribe(subscriber);
  InOrder inOrder = Mockito.inOrder(subscriber);
  inOrder.verify(subscriber, Mockito.times(1)).onNext(list("one", "two"));
  inOrder.verify(subscriber, Mockito.times(1)).onNext(list("four", "five"));
  inOrder.verify(subscriber, Mockito.never()).onNext(Mockito.<String>anyList());
  inOrder.verify(subscriber, Mockito.never()).onError(Mockito.any(Throwable.class));
  inOrder.verify(subscriber, Mockito.times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(2, 1, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, 2, TimeUnit.DAYS, Schedulers.single()));
  TestHelper.checkDisposed(Flowable.range(1, 5)
      .buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(2, 1));
  TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, 2));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testLongTimeAction() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  LongTimeAction action = new LongTimeAction(latch);
  Flowable.just(1).buffer(10, TimeUnit.MILLISECONDS, 10)
      .subscribe(action);
  latch.await();
  assertFalse(action.fail);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeTake1() {
  Flowable<Integer> source = Flowable.just(1).repeat();
  Flowable<List<Integer>> result = source.buffer(2).take(1);
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  result.subscribe(subscriber);
  verify(subscriber).onNext(Arrays.asList(1, 1));
  verify(subscriber).onComplete();
  verify(subscriber, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimeSkipDefault() {
  Flowable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
  .test()
  .assertResult(Arrays.asList(1, 2, 3, 4, 5));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedSkipEmpty() {
  Flowable.empty()
  .buffer(1, 2, TimeUnit.DAYS)
  .test()
  .assertResult(Collections.emptyList());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedExactEmpty() {
  Flowable.empty()
  .buffer(1, TimeUnit.DAYS)
  .test()
  .assertResult(Collections.emptyList());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedOverlapEmpty() {
  Flowable.empty()
  .buffer(2, 1, TimeUnit.DAYS)
  .test()
  .assertResult(Collections.emptyList());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void bufferTimedSkipError() {
  Flowable.error(new TestException())
  .buffer(1, 2, TimeUnit.DAYS)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void openClosemainError() {
  Flowable.error(new TestException())
  .buffer(Flowable.never(), Functions.justFunction(Flowable.never()))
  .test()
  .assertFailure(TestException.class);
}

相关文章

Flowable类方法