io.reactivex.Flowable.distinct()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(179)

本文整理了Java中io.reactivex.Flowable.distinct()方法的一些代码示例,展示了Flowable.distinct()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.distinct()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:distinct

Flowable.distinct介绍

[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct based on Object#equals(Object) comparison.

It is recommended the elements' class T in the flow overrides the default Object.equals() and Object#hashCode() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence.

By default, distinct() uses an internal java.util.HashSet per Subscriber to remember previously seen items and uses java.util.Set#add(Object) returning false as the indicator for duplicates.

Note that this internal HashSet may grow unbounded as items won't be removed from it by the operator. Therefore, using very long or infinite upstream (with very distinct elements) may lead to OutOfMemoryError.

Customizing the retention policy can happen only by providing a custom java.util.Collection implementation to the #distinct(Function,Callable) overload. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinct does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void distinctSupplierNull() {
  3. just1.distinct(new Function<Integer, Object>() {
  4. @Override
  5. public Object apply(Integer v) {
  6. return v;
  7. }
  8. }, null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void distinctFunctionNull() {
  3. just1.distinct(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void distinctFunctionReturnsNull() {
  3. just1.distinct(new Function<Integer, Object>() {
  4. @Override
  5. public Object apply(Integer v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDistinctOfNoneWithKeySelector() {
  3. Flowable<String> src = Flowable.empty();
  4. src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  5. verify(w, never()).onNext(anyString());
  6. verify(w, never()).onError(any(Throwable.class));
  7. verify(w, times(1)).onComplete();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Null values no longer allowed")
  3. public void testDistinctOfSourceWithExceptionsFromKeySelector() {
  4. Flowable<String> src = Flowable.just("a", "b", null, "c");
  5. src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  6. InOrder inOrder = inOrder(w);
  7. inOrder.verify(w, times(1)).onNext("a");
  8. inOrder.verify(w, times(1)).onNext("b");
  9. inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
  10. inOrder.verify(w, never()).onNext(anyString());
  11. inOrder.verify(w, never()).onComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDistinctOfNormalSource() {
  3. Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
  4. src.distinct().subscribe(w);
  5. InOrder inOrder = inOrder(w);
  6. inOrder.verify(w, times(1)).onNext("a");
  7. inOrder.verify(w, times(1)).onNext("b");
  8. inOrder.verify(w, times(1)).onNext("c");
  9. inOrder.verify(w, times(1)).onNext("e");
  10. inOrder.verify(w, times(1)).onComplete();
  11. inOrder.verify(w, never()).onNext(anyString());
  12. verify(w, never()).onError(any(Throwable.class));
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDistinctOfNone() {
  3. Flowable<String> src = Flowable.empty();
  4. src.distinct().subscribe(w);
  5. verify(w, never()).onNext(anyString());
  6. verify(w, never()).onError(any(Throwable.class));
  7. verify(w, times(1)).onComplete();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. @Ignore("Null values no longer allowed")
  3. public void testDistinctOfSourceWithNulls() {
  4. Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null);
  5. src.distinct().subscribe(w);
  6. InOrder inOrder = inOrder(w);
  7. inOrder.verify(w, times(1)).onNext(null);
  8. inOrder.verify(w, times(1)).onNext("a");
  9. inOrder.verify(w, times(1)).onNext("b");
  10. inOrder.verify(w, times(1)).onComplete();
  11. inOrder.verify(w, never()).onNext(anyString());
  12. verify(w, never()).onError(any(Throwable.class));
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testDistinctOfNormalSourceWithKeySelector() {
  3. Flowable<String> src = Flowable.just("a", "B", "c", "C", "c", "B", "b", "a", "E");
  4. src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  5. InOrder inOrder = inOrder(w);
  6. inOrder.verify(w, times(1)).onNext("a");
  7. inOrder.verify(w, times(1)).onNext("B");
  8. inOrder.verify(w, times(1)).onNext("c");
  9. inOrder.verify(w, times(1)).onNext("E");
  10. inOrder.verify(w, times(1)).onComplete();
  11. inOrder.verify(w, never()).onNext(anyString());
  12. verify(w, never()).onError(any(Throwable.class));
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void distinctSupplierReturnsNull() {
  3. just1.distinct(new Function<Integer, Object>() {
  4. @Override
  5. public Object apply(Integer v) {
  6. return v;
  7. }
  8. }, new Callable<Collection<Object>>() {
  9. @Override
  10. public Collection<Object> call() {
  11. return null;
  12. }
  13. }).blockingSubscribe();
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Publisher<Integer> createPublisher(long elements) {
  3. return
  4. Flowable.range(0, (int)elements)
  5. .concatWith(Flowable.range(0, (int)elements))
  6. .distinct()
  7. ;
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedClear() {
  3. Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  4. .distinct()
  5. .subscribe(new FlowableSubscriber<Integer>() {
  6. @Override
  7. public void onSubscribe(Subscription s) {
  8. QueueSubscription<?> qs = (QueueSubscription<?>)s;
  9. assertFalse(qs.isEmpty());
  10. qs.clear();
  11. assertTrue(qs.isEmpty());
  12. }
  13. @Override
  14. public void onNext(Integer value) {
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. }
  19. @Override
  20. public void onComplete() {
  21. }
  22. });
  23. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Flowable.error(new TestException())
  4. .distinct()
  5. .test()
  6. .assertFailure(TestException.class);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void collectionSupplierIsNull() {
  3. Flowable.just(1)
  4. .distinct(Functions.identity(), new Callable<Collection<Object>>() {
  5. @Override
  6. public Collection<Object> call() throws Exception {
  7. return null;
  8. }
  9. })
  10. .test()
  11. .assertFailure(NullPointerException.class)
  12. .assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusedSync() {
  3. TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  4. Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  5. .distinct()
  6. .subscribe(ts);
  7. SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
  8. .assertResult(1, 2, 3, 4, 5);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void collectionSupplierThrows() {
  3. Flowable.just(1)
  4. .distinct(Functions.identity(), new Callable<Collection<Object>>() {
  5. @Override
  6. public Collection<Object> call() throws Exception {
  7. throw new TestException();
  8. }
  9. })
  10. .test()
  11. .assertFailure(TestException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. /** Issue #2587. */
  2. @Test
  3. public void testRepeatAndDistinctUnbounded() {
  4. Flowable<Integer> src = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
  5. .take(3)
  6. .repeat(3)
  7. .distinct();
  8. TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  9. src.subscribe(ts);
  10. ts.assertNoErrors();
  11. ts.assertTerminated();
  12. ts.assertValues(1, 2, 3);
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doubleObserveOnErrorConditional() {
  3. Flowable.error(new TestException())
  4. .observeOn(Schedulers.computation())
  5. .distinct()
  6. .observeOn(Schedulers.single())
  7. .test()
  8. .awaitDone(5, TimeUnit.SECONDS)
  9. .assertFailure(TestException.class);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void doubleObserveOnConditional() {
  3. Flowable.just(1).hide()
  4. .observeOn(Schedulers.computation())
  5. .distinct()
  6. .observeOn(Schedulers.single())
  7. .test()
  8. .awaitDone(5, TimeUnit.SECONDS)
  9. .assertResult(1);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void boundaryFusion() {
  3. Flowable.range(1, 10000)
  4. .observeOn(Schedulers.single())
  5. .map(new Function<Integer, String>() {
  6. @Override
  7. public String apply(Integer t) throws Exception {
  8. String name = Thread.currentThread().getName();
  9. if (name.contains("RxSingleScheduler")) {
  10. return "RxSingleScheduler";
  11. }
  12. return name;
  13. }
  14. })
  15. .share()
  16. .observeOn(Schedulers.computation())
  17. .distinct()
  18. .test()
  19. .awaitDone(5, TimeUnit.SECONDS)
  20. .assertResult("RxSingleScheduler");
  21. }

相关文章

Flowable类方法