本文整理了Java中io.reactivex.Flowable.debounce()
方法的一些代码示例,展示了Flowable.debounce()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.debounce()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:debounce
[英]Returns a Flowable that mirrors the source Publisher, except that it drops items emitted by the source Publisher that are followed by newer items before a timeout value expires. The timer resets on each emission.
Note: If items keep being emitted by the source Publisher faster than the timeout then no items will be emitted by the resulting Publisher.
Information on debounce vs throttle:
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Integer> f) throws Exception {
return f.debounce(new Function<Integer, Flowable<Long>>() {
@Override
public Flowable<Long> apply(Integer v) throws Exception {
return Flowable.timer(1, TimeUnit.SECONDS);
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f)
throws Exception {
return f.debounce(1, TimeUnit.SECONDS);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Object> apply(Flowable<Object> f)
throws Exception {
return f.debounce(1, TimeUnit.SECONDS);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(final Flowable<Integer> f) throws Exception {
return Flowable.just(1).debounce(new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer v) throws Exception {
return f;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceTimedUnitNull() {
just1.debounce(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceTimedSchedulerNull() {
just1.debounce(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceFunctionNull() {
just1.debounce(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void debounceFunctionReturnsNull() {
just1.debounce(new Function<Integer, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Integer v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.debounce(Functions.justFunction(Flowable.never()));
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timedBadRequest() {
TestHelper.assertBadRequestReported(Flowable.never().debounce(1, TimeUnit.SECONDS));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDebounceWithCompleted() {
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
publishNext(subscriber, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
publishNext(subscriber, 900, "three"); // Should be skipped since onComplete will arrive before the timeout expires.
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
}
});
Flowable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(Subscriber);
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(Subscriber);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(Subscriber, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(Subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emitLate() {
final AtomicReference<Subscriber<? super Integer>> ref = new AtomicReference<Subscriber<? super Integer>>();
TestSubscriber<Integer> ts = Flowable.range(1, 2)
.debounce(new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer o) throws Exception {
if (o != 1) {
return Flowable.never();
}
return new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
ref.set(subscriber);
}
};
}
})
.test();
ref.get().onNext(1);
ts
.assertResult(2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backpressureNoRequestTimed() {
Flowable.just(1)
.debounce(1, TimeUnit.MILLISECONDS)
.test(0L)
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(MissingBackpressureException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timedError() {
Flowable.error(new TestException())
.debounce(1, TimeUnit.SECONDS)
.test()
.assertFailure(TestException.class);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceDefault() throws Exception {
Flowable.just(1).debounce(1, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void badRequestReported() {
TestHelper.assertBadRequestReported(Flowable.never().debounce(Functions.justFunction(Flowable.never())));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceWithEmpty() {
Flowable.just(1).debounce(Functions.justFunction(Flowable.empty()))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void backpressureNoRequest() {
Flowable.just(1)
.debounce(Functions.justFunction(Flowable.timer(1, TimeUnit.MILLISECONDS)))
.test(0L)
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(MissingBackpressureException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceDefaultScheduler() throws Exception {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, 1000).debounce(1, TimeUnit.SECONDS).subscribe(ts);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertValue(1000);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void debounceWithTimeBackpressure() throws InterruptedException {
TestScheduler scheduler = new TestScheduler();
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
Flowable.merge(
Flowable.just(1),
Flowable.just(2).delay(10, TimeUnit.MILLISECONDS, scheduler)
).debounce(20, TimeUnit.MILLISECONDS, scheduler).take(1).subscribe(subscriber);
scheduler.advanceTimeBy(30, TimeUnit.MILLISECONDS);
subscriber.assertValue(2);
subscriber.assertTerminated();
subscriber.assertNoErrors();
}
内容来源于网络,如有侵权,请联系作者删除!