本文整理了Java中io.reactivex.Flowable.subscribeOn()
方法的一些代码示例,展示了Flowable.subscribeOn()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribeOn()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribeOn
[英]Asynchronously subscribes Subscribers to this Publisher on the specified Scheduler.
If there is a #create(FlowableOnSubscribe,BackpressureStrategy) type source up in the chain, it is recommended to use subscribeOn(scheduler, false) instead to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: You specify which Scheduler this operator will use.
[中]在指定的计划程序上异步订阅此发布服务器的订阅服务器。
如果链中有一个#create(FlowableOnSubscribe,BackPressureStragy)类型的源,建议使用subscribeOn(scheduler,false)来避免相同的池死锁,因为请求可能堆积在急切/阻塞发射器后面。
背压:操作员不会干扰由源发布者的背压行为确定的背压。计划程序:指定此操作员将使用的计划程序。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> apply(Flowable<Integer> f) throws Exception {
return f.subscribeOn(Schedulers.single());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> apply(Integer i) {
return incrementingIntegers(new AtomicInteger())
.take(10)
.subscribeOn(Schedulers.computation());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t1) {
return composer(Flowable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, m);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t1) {
return composer(Flowable.range(t1 * 10, 2), subscriptionCount, m)
.subscribeOn(Schedulers.computation());
}
}, new BiFunction<Integer, Integer, Integer>() {
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t) {
return Flowable.just(1).subscribeOn(Schedulers.computation());
}
}).subscribe(ts);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Integer> apply(Integer t) {
return Flowable.range(1, 1000).subscribeOn(Schedulers.computation());
}
}).observeOn(Schedulers.single())
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<String> apply(Integer id) {
return Flowable.fromIterable(Arrays.asList("a-" + id, "b-" + id)).subscribeOn(getScheduler())
.map(new Function<String, String>() {
@Override
public String apply(String s) {
return "names=>" + s;
}
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
TestScheduler scheduler = new TestScheduler();
Flowable<Long> source = Flowable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.hasNext();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
TestScheduler scheduler = new TestScheduler();
Flowable<Long> source = Flowable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
it.next();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
}, false, 3)
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
}).toFlowable()
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
}, false, 3).toFlowable()
代码示例来源:origin: ReactiveX/RxJava
@Override
public CompletableSource apply(Integer v) throws Exception {
return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirstDefault() {
assertEquals(1, Flowable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void blockingFirst() {
assertEquals(1, Flowable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatTake() {
Flowable<Integer> xs = Flowable.just(1, 2);
Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void inputOutputSubscribeRace2() {
Flowable<Integer> source = Flowable.just(1).subscribeOn(Schedulers.single())
.publish(Functions.<Flowable<Integer>>identity());
for (int i = 0; i < 500; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void longSequenceEquals() {
Flowable<Integer> source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation());
Flowable.sequenceEqual(source, source)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testRaceConditions() {
Scheduler comp = Schedulers.computation();
Scheduler limited = comp.when(new Function<Flowable<Flowable<Completable>>, Completable>() {
@Override
public Completable apply(Flowable<Flowable<Completable>> t) {
return Completable.merge(Flowable.merge(t, 10));
}
});
merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void repeatScheduled() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.just(1).subscribeOn(Schedulers.computation()).repeat(5).subscribe(ts);
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertValues(1, 1, 1, 1, 1);
ts.assertNoErrors();
ts.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!