本文整理了Java中reactor.core.publisher.WorkQueueProcessor.subscribe()
方法的一些代码示例,展示了WorkQueueProcessor.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.subscribe()
方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:subscribe
暂无
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 15000L)
public void disposeSubscribeNoThreadLeak() throws Exception {
WorkQueueProcessor<String> wq = WorkQueueProcessor.<String>builder().autoCancel(false).build();
Disposable d = wq.subscribe();
d.dispose();
d = wq.subscribe();
d.dispose();
d = wq.subscribe();
d.dispose();
while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fixedThreadPoolWorkQueueRejectsSubscribers() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
CountDownLatch latch = new CountDownLatch(3);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.subscribe(spec3);
bc.onNext("foo");
bc.onComplete();
assertThat(spec1.error, is(nullValue()));
assertThat(spec2.error, is(nullValue()));
assertThat(spec3.error, is(notNullValue()));
assertThat(spec3.error.getMessage(),
startsWith(
"The executor service could not accommodate another subscriber, detected limit 2"));
try {
latch.await(1, TimeUnit.SECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void forkJoinPoolWorkQueueRejectsSubscribers() {
ExecutorService executorService = Executors.newWorkStealingPool(2);
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
CountDownLatch latch = new CountDownLatch(2);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.subscribe(spec3);
bc.onNext("foo");
bc.onComplete();
assertThat(spec1.error, is(nullValue()));
assertThat(spec2.error, is(nullValue()));
assertThat(spec3.error, is(notNullValue()));
assertThat(spec3.error.getMessage(),
is("The executor service could not accommodate another subscriber, detected limit 2"));
try {
latch.await(1, TimeUnit.SECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void chainedWorkQueueProcessor() throws Exception{
ExecutorService es = Executors.newFixedThreadPool(2);
try {
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(es).bufferSize(16).build();
int elems = 18;
CountDownLatch latch = new CountDownLatch(elems);
bc.subscribe(TopicProcessorTest.sub("spec1", latch));
Flux.range(0, elems)
.map(s -> "hello " + s)
.subscribe(bc);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
finally {
es.shutdown();
}
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 4000)
public void singleThreadWorkQueueSucceedsWithOneSubscriber() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
CountDownLatch latch = new CountDownLatch(1);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
bc.subscribe(spec1);
bc.onNext("foo");
bc.onNext("bar");
Executors.newSingleThreadScheduledExecutor()
.schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
bc.onNext("baz");
try {
latch.await(800, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e1) {
fail(e1.toString());
}
assertNull(spec1.error);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void singleThreadWorkQueueDoesntRejectsSubscribers() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
CountDownLatch latch = new CountDownLatch(1);
TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
bc.subscribe(spec1);
bc.subscribe(spec2);
bc.onNext("foo");
bc.onNext("bar");
Executors.newSingleThreadScheduledExecutor()
.schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
try {
bc.onNext("baz");
fail("expected 3rd next to time out as newSingleThreadExecutor cannot be introspected");
}
catch (Throwable e) {
assertTrue("expected AlertException, got " + e, WaitStrategy.isAlert(e));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testCustomRequestTaskThreadNameCreate() {
String expectedName = "workQueueProcessorRequestTaskCreate";
//NOTE: the below single executor should not be used usually as requestTask assumes it immediately gets executed
ExecutorService customTaskExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, expectedName));
WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder()
.executor(Executors.newCachedThreadPool())
.requestTaskExecutor(customTaskExecutor)
.bufferSize(8)
.waitStrategy(WaitStrategy.liteBlocking())
.autoCancel(true)
.build();
processor.requestTask(Operators.cancelledSubscription());
processor.subscribe();
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
//cleanup to avoid visibility in other tests
customTaskExecutor.shutdownNow();
processor.forceShutdown();
Condition<Thread> customRequestTaskThread = new Condition<>(
thread -> thread != null && expectedName.equals(thread.getName()),
"a thread named \"%s\"", expectedName);
Assertions.assertThat(threads)
.haveExactly(1, customRequestTaskThread);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void serializedSinkMultiProducerWithOnRequest() throws Exception {
int count = 1000;
WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
.share(true)
.build();
TestSubscriber subscriber = new TestSubscriber(count);
queueProcessor.subscribe(subscriber);
FluxSink<Integer> sink = queueProcessor.sink();
AtomicInteger next = new AtomicInteger();
FluxSink<Integer> serializedSink = sink.onRequest(n -> {
for (int i = 0; i < n; i++) {
synchronized (s) { // to ensure that elements are in order for testing
FluxSink<Integer> retSink = sink.next(next.getAndIncrement());
Assertions.assertThat(retSink).isInstanceOf(SerializedSink.class);
}
}
});
Assertions.assertThat(serializedSink).isInstanceOf(SerializedSink.class);
subscriber.await(Duration.ofSeconds(5));
sink.complete();
assertNull("Unexpected exception in subscriber", subscriber.failure);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testCustomRequestTaskThreadNameShare() {
String expectedName = "workQueueProcessorRequestTaskShare";
//NOTE: the below single executor should not be used usually as requestTask assumes it immediately gets executed
ExecutorService customTaskExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, expectedName));
WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder()
.executor(Executors.newCachedThreadPool())
.requestTaskExecutor(customTaskExecutor)
.bufferSize(8)
.waitStrategy(WaitStrategy.liteBlocking())
.autoCancel(true)
.build();
processor.requestTask(Operators.cancelledSubscription());
processor.subscribe();
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
//cleanup to avoid visibility in other tests
customTaskExecutor.shutdownNow();
processor.forceShutdown();
Condition<Thread> customRequestTaskThread = new Condition<>(
thread -> thread != null && expectedName.equals(thread.getName()),
"a thread named \"%s\"", expectedName);
Assertions.assertThat(threads)
.haveExactly(1, customRequestTaskThread);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void nonSerializedSinkMultiProducer() throws Exception {
int count = 1000;
WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
.share(true)
.build();
TestSubscriber subscriber = new TestSubscriber(count);
queueProcessor.subscribe(subscriber);
FluxSink<Integer> sink = queueProcessor.sink();
Assertions.assertThat(sink).isNotInstanceOf(SerializedSink.class);
for (int i = 0; i < count; i++) {
sink = sink.next(i);
Assertions.assertThat(sink).isNotInstanceOf(SerializedSink.class);
}
subscriber.await(Duration.ofSeconds(5));
assertNull("Unexpected exception in subscriber", subscriber.failure);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 15000L)
public void completeDoesNotHang() throws Exception {
WorkQueueProcessor<String> wq = WorkQueueProcessor.create();
wq.subscribe();
Assert.assertTrue(wq.downstreamCount() == 1);
wq.onComplete();
while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 15000L)
public void cancelDoesNotHang() throws Exception {
WorkQueueProcessor<String> wq = WorkQueueProcessor.create();
Disposable d = wq.subscribe();
Assert.assertTrue(wq.downstreamCount() == 1);
d.dispose();
while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testForceShutdownAfterShutdown() throws InterruptedException {
WorkQueueProcessor<String> processor = WorkQueueProcessor.<String>builder()
.name("processor").bufferSize(4)
.waitStrategy(WaitStrategy.phasedOffLiteLock(200, 100, TimeUnit.MILLISECONDS)) //eliminate the waitstrategy diff
.build();
Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);
AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);
subscriber.request(1);
Thread.sleep(250);
processor.shutdown();
assertFalse(processor.awaitAndShutdown(Duration.ofMillis(400)));
processor.forceShutdown();
assertTrue(processor.awaitAndShutdown(Duration.ofMillis(400)));
}
代码示例来源:origin: io.projectreactor.addons/reactor-logback
@Override
public void start() {
startDelegateAppender();
processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger")
.bufferSize(backlog)
.autoCancel(false)
.build();
processor.subscribe(this);
}
内容来源于网络,如有侵权,请联系作者删除!