本文整理了Java中io.reactivex.Flowable.distinctUntilChanged()
方法的一些代码示例,展示了Flowable.distinctUntilChanged()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.distinctUntilChanged()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:distinctUntilChanged
[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct from their immediate predecessors based on Object#equals(Object) comparison.
It is recommended the elements' class T in the flow overrides the default Object.equals() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence. Alternatively, use the #distinctUntilChanged(BiPredicate) overload and provide a comparison function in case the class T can't be overridden with custom equals() or the comparison itself should happen on different terms or properties of the class T.
Note that the operator always retains the latest item from upstream regardless of the comparison result and uses it in the next comparison with the next upstream item. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
[中]返回一个可流动项,该可流动项基于对象#等于(对象)比较,发出源发布服务器发出的与前一个不同的所有项。
建议流中元素的类T覆盖默认对象。equals()提供项之间有意义的比较,因为默认Java实现只考虑引用等价性。或者,使用#distinctUntilChanged(BiPredicate)重载并提供一个比较函数,以防无法使用自定义equals()重写类T,或者比较本身应该发生在类T的不同项或属性上。
请注意,无论比较结果如何,操作员始终保留来自上游的最新项目,并在与下一个上游项目的下一次比较中使用它。背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,distinctUntilChanged不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements)
.distinctUntilChanged()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctUntilChangedFunctionNull() {
just1.distinctUntilChanged((Function<Integer, Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctUntilChangedBiPredicateNull() {
just1.distinctUntilChanged((BiPredicate<Integer, Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedOfNone() {
Flowable<String> src = Flowable.empty();
src.distinctUntilChanged().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
Flowable<String> src = Flowable.just("a", "b", null, "c");
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
verify(w, times(1)).onError(any(NullPointerException.class));
inOrder.verify(w, never()).onNext(anyString());
inOrder.verify(w, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Integer> f) throws Exception {
return f.distinctUntilChanged().filter(Functions.alwaysTrue());
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedOfNoneWithKeySelector() {
Flowable<String> src = Flowable.empty();
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedOfNormalSource() {
Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
src.distinctUntilChanged().subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
Flowable<String> src = Flowable.just("a", "b", "c", "C", "c", "B", "b", "a", "e");
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("B");
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithNulls() {
Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null, null);
src.distinctUntilChanged().subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctUntilChangedWhenNonFatalExceptionThrownByKeySelectorIsNotReportedByUpstream() {
Flowable<String> src = Flowable.just("a", "b", "null", "c");
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
src
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
errorOccurred.set(true);
}
})
.distinctUntilChanged(THROWS_NON_FATAL)
.subscribe(w);
Assert.assertFalse(errorOccurred.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void directComparer() {
Flowable.fromArray(1, 2, 2, 3, 2, 4, 1, 1, 2)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer a, Integer b) {
return a.equals(b);
}
})
.test()
.assertResult(1, 2, 3, 2, 4, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void distinctUntilChangedFunctionReturnsNull() {
Flowable.range(1, 2).distinctUntilChanged(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) {
return null;
}
}).test().assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalNormal() {
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5)
.distinctUntilChanged()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
}
})
.test()
.assertResult(2, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalNormal2() {
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5).hide()
.distinctUntilChanged()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
}
})
.test()
.assertResult(2, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void customComparatorThrows() {
Flowable<String> source = Flowable.just("a", "b", "B", "A", "a", "C");
TestSubscriber<String> ts = TestSubscriber.create();
source.distinctUntilChanged(new BiPredicate<String, String>() {
@Override
public boolean test(String a, String b) {
throw new TestException();
}
})
.subscribe(ts);
ts.assertValue("a");
ts.assertNotComplete();
ts.assertError(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void conditionalFused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5)
.distinctUntilChanged()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
return v % 2 == 0;
}
})
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(2, 4);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void customComparator() {
Flowable<String> source = Flowable.just("a", "b", "B", "A", "a", "C");
TestSubscriber<String> ts = TestSubscriber.create();
source.distinctUntilChanged(new BiPredicate<String, String>() {
@Override
public boolean test(String a, String b) {
return a.compareToIgnoreCase(b) == 0;
}
})
.subscribe(ts);
ts.assertValues("a", "b", "A", "C");
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void directComparerFused() {
Flowable.fromArray(1, 2, 2, 3, 2, 4, 1, 1, 2)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer a, Integer b) {
return a.equals(b);
}
})
.to(SubscriberFusion.<Integer>test(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 2, 4, 1, 2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fused() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 2, 2, 3, 3, 4, 5)
.distinctUntilChanged(new BiPredicate<Integer, Integer>() {
@Override
public boolean test(Integer a, Integer b) throws Exception {
return a.equals(b);
}
})
.subscribe(ts);
ts.assertOf(SubscriberFusion.<Integer>assertFuseable())
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
.assertResult(1, 2, 3, 4, 5)
;
}
内容来源于网络,如有侵权,请联系作者删除!