io.reactivex.Flowable.distinct()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中io.reactivex.Flowable.distinct()方法的一些代码示例,展示了Flowable.distinct()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.distinct()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:distinct

Flowable.distinct介绍

[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct based on Object#equals(Object) comparison.

It is recommended the elements' class T in the flow overrides the default Object.equals() and Object#hashCode() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence.

By default, distinct() uses an internal java.util.HashSet per Subscriber to remember previously seen items and uses java.util.Set#add(Object) returning false as the indicator for duplicates.

Note that this internal HashSet may grow unbounded as items won't be removed from it by the operator. Therefore, using very long or infinite upstream (with very distinct elements) may lead to OutOfMemoryError.

Customizing the retention policy can happen only by providing a custom java.util.Collection implementation to the #distinct(Function,Callable) overload. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinct does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctSupplierNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctFunctionNull() {
  just1.distinct(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctFunctionReturnsNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNoneWithKeySelector() {
  Flowable<String> src = Flowable.empty();
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
  Flowable<String> src = Flowable.just("a", "b", null, "c");
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
  inOrder.verify(w, never()).onNext(anyString());
  inOrder.verify(w, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNormalSource() {
  Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
  src.distinct().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("e");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNone() {
  Flowable<String> src = Flowable.empty();
  src.distinct().subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithNulls() {
  Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null);
  src.distinct().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext(null);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNormalSourceWithKeySelector() {
  Flowable<String> src = Flowable.just("a", "B", "c", "C", "c", "B", "b", "a", "E");
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("B");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("E");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctSupplierReturnsNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements)
        .concatWith(Flowable.range(0, (int)elements))
        .distinct()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedClear() {
  Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  .distinct()
  .subscribe(new FlowableSubscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
      QueueSubscription<?> qs = (QueueSubscription<?>)s;
      assertFalse(qs.isEmpty());
      qs.clear();
      assertTrue(qs.isEmpty());
    }
    @Override
    public void onNext(Integer value) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .distinct()
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void collectionSupplierIsNull() {
  Flowable.just(1)
  .distinct(Functions.identity(), new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class)
  .assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedSync() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  .distinct()
  .subscribe(ts);
  SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void collectionSupplierThrows() {
  Flowable.just(1)
  .distinct(Functions.identity(), new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
  Flowable<Integer> src = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
      .take(3)
      .repeat(3)
      .distinct();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  src.subscribe(ts);
  ts.assertNoErrors();
  ts.assertTerminated();
  ts.assertValues(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOnErrorConditional() {
  Flowable.error(new TestException())
  .observeOn(Schedulers.computation())
  .distinct()
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleObserveOnConditional() {
  Flowable.just(1).hide()
  .observeOn(Schedulers.computation())
  .distinct()
  .observeOn(Schedulers.single())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void boundaryFusion() {
  Flowable.range(1, 10000)
  .observeOn(Schedulers.single())
  .map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) throws Exception {
      String name = Thread.currentThread().getName();
      if (name.contains("RxSingleScheduler")) {
        return "RxSingleScheduler";
      }
      return name;
    }
  })
  .share()
  .observeOn(Schedulers.computation())
  .distinct()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult("RxSingleScheduler");
}

相关文章

Flowable类方法