本文整理了Java中reactor.core.publisher.WorkQueueProcessor.builder()
方法的一些代码示例,展示了WorkQueueProcessor.builder()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.builder()
方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:builder
[英]Create a new WorkQueueProcessor Builder with default properties.
[中]使用默认属性创建新的WorkQueueProcessor Builder。
代码示例来源:origin: reactor/reactor-core
/**
* Create a new WorkQueueProcessor using {@link Queues#SMALL_BUFFER_SIZE} backlog size,
* blockingWait Strategy and auto-cancel. <p> A new Cached ThreadExecutorPool will be
* implicitly created.
* @param <E> Type of processed signals
* @return a fresh processor
*/
public static <E> WorkQueueProcessor<E> create() {
return WorkQueueProcessor.<E>builder().build();
}
代码示例来源:origin: reactor/reactor-core
@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
return WorkQueueProcessor.<Long>builder()
.name("workQueueProcessorVerification")
.bufferSize(bufferSize)
.build();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideExecutor() {
ExecutorService executor = Executors.newSingleThreadExecutor();
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.executor(executor)
.build();
assertProcessor(processor, false, null, null, null, null, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideExecutorAutoCancel() {
ExecutorService executor = Executors.newSingleThreadExecutor();
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.executor(executor)
.autoCancel(autoCancel)
.build();
assertProcessor(processor, false, null, null, null, autoCancel, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideExecutorBufferSize() {
ExecutorService executor = Executors.newSingleThreadExecutor();
int bufferSize = 1024;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.executor(executor)
.bufferSize(bufferSize)
.build();
assertProcessor(processor, false, null, bufferSize, null, null, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shareOverrideExecutor() {
ExecutorService executor = Executors.newSingleThreadExecutor();
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.share(true)
.executor(executor)
.build();
assertProcessor(processor, true, null, null, null, null, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideAutoCancel() {
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.autoCancel(autoCancel)
.build();
assertProcessor(processor, false, null, null, null, autoCancel, null, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideName() {
String name = "nameOverride";
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.name(name)
.build();
assertProcessor(processor, false, name, null, null, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shareOverrideExecutorAutoCancel() {
ExecutorService executor = Executors.newSingleThreadExecutor();
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.share(true)
.executor(executor)
.autoCancel(autoCancel)
.build();
assertProcessor(processor, true, null, null, null, autoCancel, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void serializedSinkSingleProducer() throws Exception {
WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
.share(false).build();
FluxSink<Integer> sink = queueProcessor.sink();
Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
sink = sink.next(1);
Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
sink = sink.onRequest(n -> {});
Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideExecutorBufferSizeAutoCancel() {
ExecutorService executor = Executors.newSingleThreadExecutor();
int bufferSize = 1024;
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.executor(executor)
.bufferSize(bufferSize)
.autoCancel(autoCancel)
.build();
assertProcessor(processor, false, null, bufferSize, null, autoCancel, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shareOverrideExecutorBufferSize() {
ExecutorService executor = Executors.newSingleThreadExecutor();
int bufferSize = 1024;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.share(true)
.executor(executor)
.bufferSize(bufferSize)
.build();
assertProcessor(processor, true, null, bufferSize, null, null, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideExecutorBufferSizeWaitStrategy() {
ExecutorService executor = Executors.newSingleThreadExecutor();
int bufferSize = 1024;
WaitStrategy waitStrategy = WaitStrategy.busySpin();
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.executor(executor)
.bufferSize(bufferSize)
.waitStrategy(waitStrategy)
.build();
assertProcessor(processor, false, null, bufferSize, waitStrategy, null, executor, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shareOverrideAutoCancel() {
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.share(true)
.autoCancel(autoCancel)
.build();
assertProcessor(processor, true, null, null, null, autoCancel, null, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideNameBufferSizeAutoCancel() {
String name = "nameOverride";
int bufferSize = 1024;
boolean autoCancel = false;
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.name(name)
.bufferSize(bufferSize)
.autoCancel(autoCancel)
.build();
assertProcessor(processor, false, name, bufferSize, null, autoCancel, null, null);
}
代码示例来源:origin: reactor/reactor-core
private void setupPipeline() {
processor = TopicProcessor.<String>builder().autoCancel(false).build();
workProcessor = WorkQueueProcessor.<String>builder().autoCancel(false).build();
processor.subscribe(workProcessor);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testWorkQueueProcessorGetters() {
final int TEST_BUFFER_SIZE = 16;
WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder().name("testProcessor").bufferSize(TEST_BUFFER_SIZE).build();
assertEquals(TEST_BUFFER_SIZE, processor.getAvailableCapacity());
processor.onNext(new Object());
assertEquals(TEST_BUFFER_SIZE - 1, processor.getAvailableCapacity());
processor.awaitAndShutdown();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void createOverrideNameBufferSizeWaitStrategy() {
String name = "nameOverride";
int bufferSize = 1024;
WaitStrategy waitStrategy = WaitStrategy.busySpin();
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.name(name)
.bufferSize(bufferSize)
.waitStrategy(waitStrategy)
.build();
assertProcessor(processor, false, name, bufferSize, waitStrategy, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void shareOverrideNameBufferSizeWaitStrategy() {
String name = "nameOverride";
int bufferSize = 1024;
WaitStrategy waitStrategy = WaitStrategy.busySpin();
WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
.share(true)
.name(name)
.bufferSize(bufferSize)
.waitStrategy(waitStrategy)
.build();
assertProcessor(processor, true, name, bufferSize, waitStrategy, null, null, null);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 15000L)
public void disposeSubscribeNoThreadLeak() throws Exception {
WorkQueueProcessor<String> wq = WorkQueueProcessor.<String>builder().autoCancel(false).build();
Disposable d = wq.subscribe();
d.dispose();
d = wq.subscribe();
d.dispose();
d = wq.subscribe();
d.dispose();
while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
}
}
内容来源于网络,如有侵权,请联系作者删除!