本文整理了Java中io.reactivex.Flowable.distinct()
方法的一些代码示例,展示了Flowable.distinct()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.distinct()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:distinct
[英]Returns a Flowable that emits all items emitted by the source Publisher that are distinct based on Object#equals(Object) comparison.
It is recommended the elements' class T in the flow overrides the default Object.equals() and Object#hashCode() to provide a meaningful comparison between items as the default Java implementation only considers reference equivalence.
By default, distinct() uses an internal java.util.HashSet per Subscriber to remember previously seen items and uses java.util.Set#add(Object) returning false as the indicator for duplicates.
Note that this internal HashSet may grow unbounded as items won't be removed from it by the operator. Therefore, using very long or infinite upstream (with very distinct elements) may lead to OutOfMemoryError.
Customizing the retention policy can happen only by providing a custom java.util.Collection implementation to the #distinct(Function,Callable) overload. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: distinct does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctSupplierNull() {
just1.distinct(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) {
return v;
}
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctFunctionNull() {
just1.distinct(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctFunctionReturnsNull() {
just1.distinct(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctOfNoneWithKeySelector() {
Flowable<String> src = Flowable.empty();
src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
Flowable<String> src = Flowable.just("a", "b", null, "c");
src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
inOrder.verify(w, never()).onNext(anyString());
inOrder.verify(w, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctOfNormalSource() {
Flowable<String> src = Flowable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
src.distinct().subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("e");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctOfNone() {
Flowable<String> src = Flowable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithNulls() {
Flowable<String> src = Flowable.just(null, "a", "a", null, null, "b", null);
src.distinct().subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(null);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("b");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDistinctOfNormalSourceWithKeySelector() {
Flowable<String> src = Flowable.just("a", "B", "c", "C", "c", "B", "b", "a", "E");
src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext("a");
inOrder.verify(w, times(1)).onNext("B");
inOrder.verify(w, times(1)).onNext("c");
inOrder.verify(w, times(1)).onNext("E");
inOrder.verify(w, times(1)).onComplete();
inOrder.verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void distinctSupplierReturnsNull() {
just1.distinct(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) {
return v;
}
}, new Callable<Collection<Object>>() {
@Override
public Collection<Object> call() {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements)
.concatWith(Flowable.range(0, (int)elements))
.distinct()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedClear() {
Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
.distinct()
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
QueueSubscription<?> qs = (QueueSubscription<?>)s;
assertFalse(qs.isEmpty());
qs.clear();
assertTrue(qs.isEmpty());
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.error(new TestException())
.distinct()
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void collectionSupplierIsNull() {
Flowable.just(1)
.distinct(Functions.identity(), new Callable<Collection<Object>>() {
@Override
public Collection<Object> call() throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class)
.assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedSync() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
Flowable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
.distinct()
.subscribe(ts);
SubscriberFusion.assertFusion(ts, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void collectionSupplierThrows() {
Flowable.just(1)
.distinct(Functions.identity(), new Callable<Collection<Object>>() {
@Override
public Collection<Object> call() throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
Flowable<Integer> src = Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.take(3)
.repeat(3)
.distinct();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
src.subscribe(ts);
ts.assertNoErrors();
ts.assertTerminated();
ts.assertValues(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleObserveOnErrorConditional() {
Flowable.error(new TestException())
.observeOn(Schedulers.computation())
.distinct()
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleObserveOnConditional() {
Flowable.just(1).hide()
.observeOn(Schedulers.computation())
.distinct()
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
.share()
.observeOn(Schedulers.computation())
.distinct()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("RxSingleScheduler");
}
内容来源于网络,如有侵权,请联系作者删除!