[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct from their immediate predecessors based on Object#equals(Object) comparison.
It is recommended the elements' class T in the flow overrides the default Object.equals() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence. Alternatively, use the #distinctUntilChanged(BiPredicate) overload and provide a comparison function in case the class T can't be overridden with custom equals() or the comparison itself should happen on different terms or properties of the class T.
Note that the operator always retains the latest item from upstream regardless of the comparison result and uses it in the next comparison with the next upstream item. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
public Publisher<Integer> createPublisher(long elements) {
Flowable.range(0, (int)elements)
@Test(expected = NullPointerException.class)
public void distinctUntilChangedFunctionNull() {
just1.distinctUntilChanged((Function<Integer, Integer>)null);
@Test(expected = NullPointerException.class)
public void distinctUntilChangedBiPredicateNull() {
just1.distinctUntilChanged((BiPredicate<Integer, Integer>)null);
public void testDistinctUntilChangedOfNone() {
Flowable<String> src = Flowable.empty();
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
Flowable<String> src = Flowable.just("a", "b", null, "c");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
verify(w, times(1)).onError(any(NullPointerException.class));
inOrder.verify(w, never()).onNext(anyString());
inOrder.verify(w, never()).onComplete();
public Object apply(Flowable<Integer> f) throws Exception {
return f.distinctUntilChanged().filter(Functions.alwaysTrue());
}, false, 1, 1, 1);
public void testDistinctUntilChangedOfNoneWithKeySelector() {
Flowable<String> src = Flowable.empty();
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
public void testDistinctUntilChangedOfNormalSource() {
Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
Flowable<String> src = Flowable.just("a", "b", "c", "C", "c", "B", "b", "a", "e");
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("B");
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithNulls() {
Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null, null);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
Flowable<String> src = Flowable.just("a", "b", "null", "c");
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
.doOnError(new Consumer<Throwable>() {
public void accept(Throwable t) {
public void directComparer() {
Flowable.fromArray(1, 2, 2, 3, 2, 4, 1, 1, 2)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
public boolean test(Integer a, Integer b) {
return a.equals(b);
.assertResult(1, 2, 3, 2, 4, 1, 2);
public void distinctUntilChangedFunctionReturnsNull() {
Flowable.range(1, 2).distinctUntilChanged(new Function<Integer, Object>() {
public Object apply(Integer v) {
return null;
public void conditionalNormal() {
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5)
.filter(new Predicate<Integer>() {
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
.assertResult(2, 4);
public void conditionalNormal2() {
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5).hide()
.filter(new Predicate<Integer>() {
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
.assertResult(2, 4);
public void customComparatorThrows() {
Flowable<String> source = Flowable.just("a", "b", "B", "A", "a", "C");
TestSubscriber<String> ts = TestSubscriber.create();
source.distinctUntilChanged(new BiPredicate<String, String>() {
public boolean test(String a, String b) {
throw new TestException();
public void conditionalFused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5)
.filter(new Predicate<Integer>() {
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(2, 4);
public void customComparator() {
Flowable<String> source = Flowable.just("a", "b", "B", "A", "a", "C");
TestSubscriber<String> ts = TestSubscriber.create();
source.distinctUntilChanged(new BiPredicate<String, String>() {
public boolean test(String a, String b) {
return a.compareToIgnoreCase(b) == 0;
ts.assertValues("a", "b", "A", "C");
public void directComparerFused() {
Flowable.fromArray(1, 2, 2, 3, 2, 4, 1, 1, 2)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
public boolean test(Integer a, Integer b) {
return a.equals(b);
.to(SubscriberFusion.<Integer>test(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertResult(1, 2, 3, 2, 4, 1, 2);
public void fused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 2, 2, 3, 3, 4, 5)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
public boolean test(Integer a, Integer b) throws Exception {
return a.equals(b);
.assertResult(1, 2, 3, 4, 5)