本文整理了Java中io.reactivex.Flowable.fromPublisher()
方法的一些代码示例,展示了Flowable.fromPublisher()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromPublisher()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromPublisher
[英]Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a Flowable.
The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.
If possible, use #create(FlowableOnSubscribe,BackpressureStrategy) to create a source-like Flowable instead.
Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The operator is a pass-through for backpressure and its behavior is determined by the backpressure behavior of the wrapped publisher. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将任意反应流发布服务器转换为可流发布服务器(如果尚未是可流发布服务器)。
发布者必须遵循Reactive-Streams specification。违反规范可能导致未定义的行为。
如果可能,使用#create(FlowableOnSubscribe,BackPressureStragy)创建类似Flowable的源。
请注意,尽管Publisher看起来是一个功能接口,但不建议通过lambda实现它,因为规范要求使用无状态lambda无法实现状态管理。背压:操作符是背压的传递,其行为由已包装发布服务器的背压行为决定。调度程序:默认情况下,fromPublisher不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<T> apply(final Publisher<T> onSubscribe) {
return Flowable.fromPublisher(onSubscribe);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<R> apply(Flowable<T> t) throws Exception {
Publisher<R> p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher");
return Flowable.fromPublisher(p).observeOn(scheduler);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Iterator<T> iterator() {
LatestSubscriberIterator<T> lio = new LatestSubscriberIterator<T>();
Flowable.<T>fromPublisher(source).materialize().subscribe(lio);
return lio;
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromPublisherNull() {
Flowable.fromPublisher(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = TestException.class)
public void firstOnError() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new TestException());
}
});
source.blockingFirst();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void firstFgnoredCancelAndOnNext() {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onNext(1);
s.onNext(2);
}
});
assertEquals(1, source.blockingFirst().intValue());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates the Publisher sequence of Publishers into a single sequence by subscribing to each inner Publisher,
* one after the other, one at a time and delays any errors till the all inner and the outer Publishers terminate.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatDelayError.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the Publisher sequence of Publishers
* @return the new Publisher with the concatenating behavior
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatDelayError(Publisher<? extends MaybeSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).concatMapDelayError((Function)MaybeToPublisher.instance());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Concatenates the Publisher sequence of Publishers into a single sequence by subscribing to each inner Publisher,
* one after the other, one at a time and delays any errors till the all inner and the outer Publishers terminate.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources the Publisher sequence of Publishers
* @param prefetch the number of elements to prefetch from the outer Publisher
* @param tillTheEnd if true exceptions from the outer and all inner Publishers are delayed to the end
* if false, exception from the outer Publisher is delayed till the current Publisher terminates
* @return the new Publisher with the concatenating behavior
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch, boolean tillTheEnd) {
return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), prefetch, tillTheEnd);
}
代码示例来源:origin: ReactiveX/RxJava
public final <R> CylonDetectorObservable<R> boop(Function<? super T, ? extends R> func) {
return new CylonDetectorObservable<R>(new FlowableMap<T, R>(Flowable.fromPublisher(onSubscribe), func));
}
代码示例来源:origin: ReactiveX/RxJava
public final CylonDetectorObservable<T> beep(Predicate<? super T> predicate) {
return new CylonDetectorObservable<T>(new FlowableFilter<T>(Flowable.fromPublisher(onSubscribe), predicate));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void wrap() {
Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onComplete();
}
})
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrash() {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void backpressureWithBufferDropOldest() throws InterruptedException {
int bufferSize = 3;
final AtomicInteger droppedCount = new AtomicInteger(0);
Action incrementOnDrop = new Action() {
@Override
public void run() throws Exception {
droppedCount.incrementAndGet();
}
};
TestSubscriber<Long> ts = createTestSubscriber();
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_OLDEST))
.subscribe(ts);
// we request 10 but only 3 should come from the buffer
ts.request(10);
ts.awaitTerminalEvent();
assertEquals(bufferSize, ts.values().size());
ts.assertNoErrors();
assertEquals(497, ts.values().get(0).intValue());
assertEquals(498, ts.values().get(1).intValue());
assertEquals(499, ts.values().get(2).intValue());
assertEquals(droppedCount.get(), 500 - bufferSize);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void backpressureWithBufferDropLatest() throws InterruptedException {
int bufferSize = 3;
final AtomicInteger droppedCount = new AtomicInteger(0);
Action incrementOnDrop = new Action() {
@Override
public void run() throws Exception {
droppedCount.incrementAndGet();
}
};
TestSubscriber<Long> ts = createTestSubscriber();
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_LATEST))
.subscribe(ts);
// we request 10 but only 3 should come from the buffer
ts.request(10);
ts.awaitTerminalEvent();
assertEquals(bufferSize, ts.values().size());
ts.assertNoErrors();
assertEquals(0, ts.values().get(0).intValue());
assertEquals(1, ts.values().get(1).intValue());
assertEquals(499, ts.values().get(2).intValue());
assertEquals(droppedCount.get(), 500 - bufferSize);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void firstIgnoredCancelAndOnError() {
List<Throwable> list = TestHelper.trackPluginErrors();
try {
Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onNext(1);
s.onError(new TestException());
}
});
assertEquals(1, source.blockingFirst().intValue());
TestHelper.assertUndeliverable(list, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteCrashConditional() {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onErrorAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new TestException());
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void overflowReported() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Completable.concat(
Flowable.fromPublisher(new Publisher<Completable>() {
@Override
public void subscribe(Subscriber<? super Completable> s) {
s.onSubscribe(new BooleanSubscription());
s.onNext(Completable.never());
s.onNext(Completable.never());
s.onNext(Completable.never());
s.onNext(Completable.never());
s.onComplete();
}
}), 1
)
.test()
.assertFailure(MissingBackpressureException.class);
TestHelper.assertError(errors, 0, MissingBackpressureException.class);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void onCompleteAfterCrashConditional() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.fromPublisher(new Publisher<Object>() {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new BooleanSubscription());
s.onComplete();
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new IOException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, IOException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!