本文整理了Java中reactor.core.publisher.WorkQueueProcessor.onNext()
方法的一些代码示例,展示了WorkQueueProcessor.onNext()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.onNext()
方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:onNext
暂无
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 4000)
public void singleThreadWorkQueueSucceedsWithOneSubscriber() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
CountDownLatch latch = new CountDownLatch(1);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
bc.subscribe(spec1);
bc.onNext("foo");
bc.onNext("bar");
Executors.newSingleThreadScheduledExecutor()
.schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
bc.onNext("baz");
try {
latch.await(800, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
assertNull(spec1.error);
}
代码示例来源:origin: reactor/reactor-core
@Test()
public void retryNoThreadLeak() throws Exception {
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
wq.handle((integer, sink) -> sink.error(new RuntimeException()))
.retry(10)
.subscribe();
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
wq.onComplete();
while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void singleThreadWorkQueueDoesntRejectsSubscribers() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
CountDownLatch latch = new CountDownLatch(1);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.onNext("foo");
bc.onNext("bar");
Executors.newSingleThreadScheduledExecutor()
.schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
try {
bc.onNext("baz");
fail("expected 3rd next to time out as newSingleThreadExecutor cannot be introspected");
}
catch (Throwable e) {
assertTrue("expected AlertException, got " + e, WaitStrategy.isAlert(e));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void retryErrorPropagatedFromWorkQueueSubscriberCold() throws Exception {
AtomicInteger errors = new AtomicInteger(3);
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
AtomicInteger onNextSignals = new AtomicInteger();
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
wq.onComplete();
StepVerifier.create(wq.log("wq", Level.FINE)
.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (errors.decrementAndGet() > 0) {
sink.error(new RuntimeException());
}
else {
sink.next(s1);
}
}).retry())
.expectNext(1, 2, 3)
.verifyComplete();
assertThat(onNextSignals.get(), equalTo(5));
while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void retryErrorPropagatedFromWorkQueueSubscriberHot() throws Exception {
AtomicInteger errors = new AtomicInteger(3);
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
AtomicInteger onNextSignals = new AtomicInteger();
StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (errors.decrementAndGet() > 0) {
sink.error(new RuntimeException("expected"));
}
else {
sink.next(s1);
}
}).retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(3)
.thenCancel()
.verify();
assertThat(onNextSignals.get(), equalTo(3));
while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fixedThreadPoolWorkQueueRejectsSubscribers() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
CountDownLatch latch = new CountDownLatch(3);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.subscribe(spec3);
bc.onNext("foo");
bc.onComplete();
assertThat(spec1.error, is(nullValue()));
assertThat(spec2.error, is(nullValue()));
assertThat(spec3.error, is(notNullValue()));
assertThat(spec3.error.getMessage(),
startsWith(
"The executor service could not accommodate another subscriber, detected limit 2"));
try {
latch.await(1, TimeUnit.SECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void forkJoinPoolWorkQueueRejectsSubscribers() {
ExecutorService executorService = Executors.newWorkStealingPool(2);
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
CountDownLatch latch = new CountDownLatch(2);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.subscribe(spec3);
bc.onNext("foo");
bc.onComplete();
assertThat(spec1.error, is(nullValue()));
assertThat(spec2.error, is(nullValue()));
assertThat(spec3.error, is(notNullValue()));
assertThat(spec3.error.getMessage(),
is("The executor service could not accommodate another subscriber, detected limit 2"));
try {
latch.await(1, TimeUnit.SECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void retryErrorPropagatedFromWorkQueueSubscriberHotPoisonSignal()
throws Exception {
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
AtomicInteger onNextSignals = new AtomicInteger();
StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (s1 == 1) {
sink.error(new RuntimeException());
}
else {
sink.next(s1);
}
}).retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(2, 3)
.thenCancel()
.verify();
assertThat(onNextSignals.get(), equalTo(3));
while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void retryErrorPropagatedFromWorkQueueSubscriberHotPoisonSignal2()
throws Exception {
WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
AtomicInteger onNextSignals = new AtomicInteger();
StepVerifier.create(wq.log()
.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
(s1, sink) -> {
if (s1 == 2) {
sink.error(new RuntimeException());
}
else {
sink.next(s1);
}
}).retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(1, 3)
.thenCancel()
.verify();
assertThat(onNextSignals.get(), equalTo(3));
while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
}
}
代码示例来源:origin: reactor/reactor-core
.retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(3)
代码示例来源:origin: reactor/reactor-core
.retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(3)
代码示例来源:origin: reactor/reactor-core
@Test
public void testWorkQueueProcessorGetters() {
final int TEST_BUFFER_SIZE = 16;
WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder().name("testProcessor").bufferSize(TEST_BUFFER_SIZE).build();
assertEquals(TEST_BUFFER_SIZE, processor.getAvailableCapacity());
processor.onNext(new Object());
assertEquals(TEST_BUFFER_SIZE - 1, processor.getAvailableCapacity());
processor.awaitAndShutdown();
}
代码示例来源:origin: reactor/reactor-core
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(2, 3)
代码示例来源:origin: reactor/reactor-core
.retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNextMatches(d -> d == 2 || d == 3)
代码示例来源:origin: reactor/reactor-core
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNext(2, 3)
代码示例来源:origin: reactor/reactor-core
.retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNextMatches(d -> d == 2 || d == 3)
代码示例来源:origin: reactor/reactor-core
.retry())
.then(() -> {
wq.onNext(1);
wq.onNext(2);
wq.onNext(3);
})
.expectNextMatches(d -> d == 2 || d == 3)
代码示例来源:origin: io.projectreactor.addons/reactor-logback
protected void queueLoggingEvent(ILoggingEvent evt) {
if (null != delegate.get()) {
processor.onNext(evt);
}
}
内容来源于网络,如有侵权,请联系作者删除!