本文整理了Java中io.reactivex.Flowable.fromCallable()
方法的一些代码示例,展示了Flowable.fromCallable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromCallable()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromCallable
[英]Returns a Flowable that, when a Subscriber subscribes to it, invokes a function you specify and then emits the value returned from that function.
This allows you to defer the execution of the function you specify until a Subscriber subscribes to the Publisher. That is to say, it makes the function "lazy." Backpressure: The operator honors backpressure from downstream. Scheduler: fromCallable does not operate by default on a particular Scheduler.
[中]返回一个可流动函数,当订阅服务器订阅该函数时,该函数调用您指定的函数,然后发出从该函数返回的值。
这允许您推迟执行指定的函数,直到订阅服务器订阅发布服务器。也就是说,它使函数“懒惰”背压:操作员接受来自下游的背压。调度程序:默认情况下,fromCallable不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Long> createPublisher(final long elements) {
return
Flowable.fromCallable(new Callable<Long>() {
@Override
public Long call() throws Exception {
return 1L;
}
}
)
;
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<? extends Object> apply(Integer v)
throws Exception {
return Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return ++calls[0];
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<? extends Object> apply(Integer v)
throws Exception {
return Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return ++calls[0];
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<? extends Object> apply(Integer v)
throws Exception {
return Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<? extends Object> apply(Integer v)
throws Exception {
return Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer v) throws Exception {
return Flowable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw new TestException();
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromCallableNull() {
Flowable.fromCallable(null);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
verifyZeroInteractions(func);
fromCallableFlowable.subscribe();
verify(func).call();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnError() throws Exception {
Callable<Object> func = mock(Callable.class);
Throwable throwable = new IllegalStateException("Test exception");
when(func.call()).thenThrow(throwable);
Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
fromCallableFlowable.subscribe(subscriber);
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
verify(subscriber).onError(throwable);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
fromCallableFlowable.subscribe(subscriber);
verify(subscriber).onNext("test_value");
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromCallableReturnsNull() {
Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
}).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void cancelledUpFrontConnectAnyway() {
final AtomicInteger call = new AtomicInteger();
Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return call.incrementAndGet();
}
})
.cache()
.test(1L, true)
.assertNoValues();
assertEquals(1, call.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void scalarXMap() {
Flowable.fromCallable(Functions.justCallable(1))
.flatMap(Functions.justFunction(Flowable.fromCallable(Functions.justCallable(2))))
.test()
.assertResult(2);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callableThrows() {
Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
})
.flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3)))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callable() {
Maybe.concat(Flowable.fromCallable(new Callable<Maybe<Integer>>() {
@Override
public Maybe<Integer> call() throws Exception {
return Maybe.just(1);
}
}))
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callable() {
Single.concat(Flowable.fromCallable(new Callable<Single<Integer>>() {
@Override
public Single<Integer> call() throws Exception {
return Single.just(1);
}
}))
.test()
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mergeScalarError() {
Flowable.merge(Flowable.just(Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
})).hide())
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callableCrash() {
Flowable.just(1).hide()
.concatMap(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
})))
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void scalarXMap() {
Flowable.fromCallable(Functions.justCallable(1))
.switchMap(Functions.justFunction(Flowable.just(1)))
.test()
.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callableCrashDelayError() {
Flowable.just(1).hide()
.concatMapDelayError(Functions.justFunction(Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
})))
.test()
.assertFailure(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!