[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct based on Object#equals(Object) comparison.
It is recommended the elements' class T in the flow overrides the default Object.equals() and Object#hashCode() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence.
By default, distinct() uses an internal java.util.HashSet per Subscriber to remember previously seen items and uses java.util.Set#add(Object) returning false as the indicator for duplicates.
Note that this internal HashSet may grow unbounded as items won't be removed from it by the operator. Therefore, using very long or infinite upstream (with very distinct elements) may lead to OutOfMemoryError.
Customizing the retention policy can happen only by providing a custom java.util.Collection implementation to the #distinct(Function,Callable) overload. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinct does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctSupplierNull() {
just1.distinct(new Function<Integer, Object>() {
public Object apply(Integer v) {
return v;
}, null);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctFunctionNull() {
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctFunctionReturnsNull() {
just1.distinct(new Function<Integer, Object>() {
public Object apply(Integer v) {
return null;
代码示例来源:origin: ReactiveX/RxJava
public void testDistinctOfNoneWithKeySelector() {
Flowable<String> src = Flowable.empty();
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
代码示例来源:origin: ReactiveX/RxJava
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
Flowable<String> src = Flowable.just("a", "b", null, "c");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
inOrder.verify(w, never()).onNext(anyString());
inOrder.verify(w, never()).onComplete();
代码示例来源:origin: ReactiveX/RxJava
public void testDistinctOfNormalSource() {
Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
代码示例来源:origin: ReactiveX/RxJava
public void testDistinctOfNone() {
Flowable<String> src = Flowable.empty();
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
代码示例来源:origin: ReactiveX/RxJava
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithNulls() {
Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
代码示例来源:origin: ReactiveX/RxJava
public void testDistinctOfNormalSourceWithKeySelector() {
Flowable<String> src = Flowable.just("a", "B", "c", "C", "c", "B", "b", "a", "E");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("B");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("E");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctSupplierReturnsNull() {
just1.distinct(new Function<Integer, Object>() {
public Object apply(Integer v) {
return v;
}, new Callable<Collection<Object>>() {
public Collection<Object> call() {
return null;
代码示例来源:origin: ReactiveX/RxJava
public Publisher<Integer> createPublisher(long elements) {
Flowable.range(0, (int)elements)
.concatWith(Flowable.range(0, (int)elements))
代码示例来源:origin: ReactiveX/RxJava
public void fusedClear() {
Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
.subscribe(new FlowableSubscriber<Integer>() {
public void onSubscribe(Subscription s) {
QueueSubscription<?> qs = (QueueSubscription<?>)s;
public void onNext(Integer value) {
public void onError(Throwable e) {
public void onComplete() {
代码示例来源:origin: ReactiveX/RxJava
public void error() {
Flowable.error(new TestException())
代码示例来源:origin: ReactiveX/RxJava
public void collectionSupplierIsNull() {
.distinct(Functions.identity(), new Callable<Collection<Object>>() {
public Collection<Object> call() throws Exception {
return null;
.assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
代码示例来源:origin: ReactiveX/RxJava
public void fusedSync() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
代码示例来源:origin: ReactiveX/RxJava
public void collectionSupplierThrows() {
.distinct(Functions.identity(), new Callable<Collection<Object>>() {
public Collection<Object> call() throws Exception {
throw new TestException();
代码示例来源:origin: ReactiveX/RxJava
/** Issue #2587. */
public void testRepeatAndDistinctUnbounded() {
Flowable<Integer> src = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
ts.assertValues(1, 2, 3);
代码示例来源:origin: ReactiveX/RxJava
public void doubleObserveOnErrorConditional() {
Flowable.error(new TestException())
.awaitDone(5, TimeUnit.SECONDS)
代码示例来源:origin: ReactiveX/RxJava
public void doubleObserveOnConditional() {
.awaitDone(5, TimeUnit.SECONDS)
代码示例来源:origin: ReactiveX/RxJava
public void boundaryFusion() {
Flowable.range(1, 10000)
.map(new Function<Integer, String>() {
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
return name;
.awaitDone(5, TimeUnit.SECONDS)