本文整理了Java中io.reactivex.Flowable.doAfterNext()
方法的一些代码示例,展示了Flowable.doAfterNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doAfterNext()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doAfterNext
[英]Calls the specified consumer with the current item after this item has been emitted to the downstream.
Note that the onAfterNext action is shared between subscriptions and as such should be thread-safe. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doAfterNext does not operate by default on a particular Scheduler. Operator-fusion: This operator supports normal and conditional Subscribers as well as boundary-limited synchronous or asynchronous queue-fusion.
History: 2.0.1 - experimental
[中]将当前项发送到下游后,使用当前项调用指定的使用者。
请注意,onAfterNext操作在订阅之间共享,因此应该是线程安全的。背压:操作员不会干扰由源发布者的背压行为确定的背压。Scheduler:doAfterNext默认情况下不会在特定的计划程序上运行。运算符融合:此运算符支持普通和条件订阅服务器以及边界限制的同步或异步队列融合。
历史:2.0.1-实验性
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).doAfterNext(Functions.emptyConsumer())
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void consumerNull() {
Flowable.just(1).doAfterNext(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void just() {
Flowable.just(1)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void range() {
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
Flowable.<Integer>empty()
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void consumerThrows() {
Flowable.just(1, 2)
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIfFunctionThrowsThatNoMoreEventsAreProcessed() {
final AtomicInteger count = new AtomicInteger();
final RuntimeException e = new RuntimeException();
Burst.items(1, 2).create()
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
count.incrementAndGet();
throw e;
}})
.test()
.assertError(e)
.assertValue(1);
assertEquals(1, count.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Flowable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emptyConditional() {
Flowable.<Integer>empty()
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult();
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedRejected() {
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueFuseable.ASYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);
SubscriberFusion.assertFusion(ts0, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void justConditional() {
Flowable.just(1)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFused() {
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);
SubscriberFusion.assertFusion(ts0, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void rangeConditional() {
Flowable.range(1, 5)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorConditional() {
Flowable.<Integer>error(new TestException())
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertFailure(TestException.class);
assertTrue(values.isEmpty());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedConditional() {
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueFuseable.SYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribe(ts0);
SubscriberFusion.assertFusion(ts0, QueueFuseable.SYNC)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void consumerThrowsConditional() {
Flowable.just(1, 2)
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
throw new TestException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedRejectedConditional() {
TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueFuseable.ASYNC);
Flowable.range(1, 5)
.doAfterNext(afterNext)
.filter(Functions.alwaysTrue())
.subscribe(ts0);
SubscriberFusion.assertFusion(ts0, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void consumerThrowsConditional2() {
Flowable.just(1, 2).hide()
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
throw new TestException();
}
})
.filter(Functions.alwaysTrue())
.test()
.assertFailure(TestException.class, 1);
}
}
代码示例来源:origin: org.apache.camel/camel-rxjava2
synchronized void attach(ReactiveStreamsProducer producer) {
Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
if (this.camelProducer != null) {
throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
}
if (this.camelProducer != producer) {
detach();
ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
Flowable<Exchange> flow = Flowable.create(camelEmitter::set, BackpressureStrategy.MISSING);
if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
flow.onBackpressureDrop(this::onBackPressure)
.doAfterNext(this::onItemEmitted)
.subscribe(this.publisher);
} else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
flow.doAfterNext(this::onItemEmitted)
.onBackpressureLatest()
.subscribe(this.publisher);
} else {
flow.doAfterNext(this::onItemEmitted)
.onBackpressureBuffer()
.subscribe(this.publisher);
}
camelProducer = producer;
}
}
代码示例来源:origin: org.infinispan/infinispan-persistence-soft-index
handleFilePublisher(filePublisher.doAfterNext(compactor::completeFile), false, false,
(file, offset, size, serializedKey, serializedMetadata, serializedValue, seqId, expiration) -> {
long prevSeqId;
内容来源于网络,如有侵权,请联系作者删除!