本文整理了Java中io.reactivex.Flowable.defer()
方法的一些代码示例,展示了Flowable.defer()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.defer()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:defer
[英]Returns a Flowable that calls a Publisher factory to create a Publisher for each new Subscriber that subscribes. That is, for each subscriber, the actual Publisher that subscriber observes is determined by the factory function.
The defer Subscriber allows you to defer or delay emitting items from a Publisher until such time as a Subscriber subscribes to the Publisher. This allows a Subscriber to easily obtain updates or a refreshed version of the sequence. Backpressure: The operator itself doesn't interfere with backpressure which is determined by the Publisherreturned by the supplier. Scheduler: defer does not operate by default on a particular Scheduler.
[中]返回一个FlowTable,它调用发布者工厂为订阅的每个新订阅服务器创建发布者。也就是说,对于每个订阅服务器,订阅服务器观察到的实际发布服务器由工厂函数确定。
“延迟订阅服务器”允许您延迟或延迟从发布服务器发送项目,直到订阅服务器订阅发布服务器为止。这允许订阅者轻松获得序列的更新或刷新版本。背压:操作员本身不会干扰背压,背压由供应商提供的压力决定。调度程序:默认情况下,延迟不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<String> apply(Long t1) {
return Flowable.defer(new Callable<Publisher<String>>() {
@Override
public Publisher<String> call() {
return Flowable.<String>error(new TestException("Some exception"));
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(final long elements) {
return
Flowable.defer(new Callable<Publisher<Long>>() {
@Override
public Publisher<Long> call() throws Exception {
return Flowable.fromIterable(iterate(elements));
}
}
)
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Long> apply(Integer t) {
return Flowable.defer(new Callable<Flowable<Long>>() {
@Override
public Flowable<Long> call() {
return Flowable.just(0l);
}
}).subscribeOn(sched);
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void deferFunctionNull() {
Flowable.defer(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDeferFunctionThrows() throws Exception {
Callable<Flowable<String>> factory = mock(Callable.class);
when(factory.call()).thenThrow(new TestException());
Flowable<String> result = Flowable.defer(factory);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
result.subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any(String.class));
verify(subscriber, never()).onComplete();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void deferFunctionReturnsNull() {
Flowable.defer(new Callable<Publisher<Object>>() {
@Override
public Publisher<Object> call() {
return null;
}
}).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRetry() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() throws Exception {
if (times.getAndIncrement() < 4) {
return Flowable.error(new TestException());
}
return Flowable.just(1);
}
})
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retry(5)
.test()
.assertResult(1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRetryWhile() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() throws Exception {
if (times.getAndIncrement() < 4) {
return Flowable.error(new TestException());
}
return Flowable.just(1);
}
})
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retry(5, Functions.alwaysTrue())
.test()
.assertResult(1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRetryWhile2() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() throws Exception {
if (times.getAndIncrement() < 4) {
return Flowable.error(new TestException());
}
return Flowable.just(1);
}
})
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer a, Throwable b) throws Exception {
return a < 5;
}
})
.test()
.assertResult(1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRetryUntil() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger times = new AtomicInteger();
Flowable<Integer> source = Flowable.defer(new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() throws Exception {
if (times.getAndIncrement() < 4) {
return Flowable.error(new TestException());
}
return Flowable.just(1);
}
})
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return false;
}
})
.test()
.assertResult(1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDefer() throws Throwable {
Callable<Flowable<String>> factory = mock(Callable.class);
Flowable<String> firstObservable = Flowable.just("one", "two");
Flowable<String> secondObservable = Flowable.just("three", "four");
when(factory.call()).thenReturn(firstObservable, secondObservable);
Flowable<String> deferred = Flowable.defer(factory);
verifyZeroInteractions(factory);
Subscriber<String> firstSubscriber = TestHelper.mockSubscriber();
deferred.subscribe(firstSubscriber);
verify(factory, times(1)).call();
verify(firstSubscriber, times(1)).onNext("one");
verify(firstSubscriber, times(1)).onNext("two");
verify(firstSubscriber, times(0)).onNext("three");
verify(firstSubscriber, times(0)).onNext("four");
verify(firstSubscriber, times(1)).onComplete();
Subscriber<String> secondSubscriber = TestHelper.mockSubscriber();
deferred.subscribe(secondSubscriber);
verify(factory, times(2)).call();
verify(secondSubscriber, times(0)).onNext("one");
verify(secondSubscriber, times(0)).onNext("two");
verify(secondSubscriber, times(1)).onNext("three");
verify(secondSubscriber, times(1)).onNext("four");
verify(secondSubscriber, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTimeoutSelectorFirstThrows() {
Flowable<Integer> source = Flowable.<Integer>never();
final PublishProcessor<Integer> timeout = PublishProcessor.create();
Function<Integer, Flowable<Integer>> timeoutFunc = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
return timeout;
}
};
Callable<Flowable<Integer>> firstTimeoutFunc = new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() {
throw new TestException();
}
};
Flowable<Integer> other = Flowable.fromIterable(Arrays.asList(100));
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
source.timeout(Flowable.defer(firstTimeoutFunc), timeoutFunc, other).subscribe(subscriber);
verify(subscriber).onError(any(TestException.class));
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelayWithFlowableSubscriptionFunctionThrows() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> delay = PublishProcessor.create();
Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() {
throw new TestException();
}
};
Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
return delay;
}
};
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
source.onNext(1);
delay.onNext(1);
source.onNext(2);
inOrder.verify(subscriber).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelayWithFlowableSubscriptionThrows() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> delay = PublishProcessor.create();
Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() {
return delay;
}
};
Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
return delay;
}
};
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
source.onNext(1);
delay.onError(new TestException());
source.onNext(2);
inOrder.verify(subscriber).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testMergeCovariance4() {
Flowable<Movie> f1 = Flowable.defer(new Callable<Publisher<Movie>>() {
@Override
public Publisher<Movie> call() {
return Flowable.just(
new HorrorMovie(),
new Movie()
);
}
});
Flowable<Media> f2 = Flowable.just(new Media(), new HorrorMovie());
List<Media> values = Flowable.merge(f1, f2).toList().blockingGet();
assertTrue(values.get(0) instanceof HorrorMovie);
assertTrue(values.get(1) instanceof Movie);
assertTrue(values.get(2) != null);
assertTrue(values.get(3) instanceof HorrorMovie);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelayWithFlowableSubscriptionRunCompletion() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> sdelay = PublishProcessor.create();
final PublishProcessor<Integer> delay = PublishProcessor.create();
Callable<Flowable<Integer>> subFunc = new Callable<Flowable<Integer>>() {
@Override
public Flowable<Integer> call() {
return sdelay;
}
};
Function<Integer, Flowable<Integer>> delayFunc = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
return delay;
}
};
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.delay(Flowable.defer(subFunc), delayFunc).subscribe(subscriber);
source.onNext(1);
sdelay.onComplete();
source.onNext(2);
delay.onNext(2);
inOrder.verify(subscriber).onNext(2);
inOrder.verifyNoMoreInteractions();
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Flowable.range(1, Flowable.bufferSize() * 2)
.delay(Flowable.defer(new Callable<Flowable<Long>>() {
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierSimple() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
pp.onNext(1);
ts.assertValues(1, 2, 3, 4, 5);
ts.assertComplete();
ts.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierCompletes() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
// FIXME should this complete the source instead of consuming it?
pp.onComplete();
ts.assertValues(1, 2, 3, 4, 5);
ts.assertComplete();
ts.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testDelaySupplierErrors() {
final PublishProcessor<Integer> pp = PublishProcessor.create();
Flowable<Integer> source = Flowable.range(1, 5);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
source.delaySubscription(Flowable.defer(new Callable<Publisher<Integer>>() {
@Override
public Publisher<Integer> call() {
return pp;
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
pp.onError(new TestException());
ts.assertNoValues();
ts.assertNotComplete();
ts.assertError(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!