reactor.core.publisher.WorkQueueProcessor.builder()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(167)

本文整理了Java中reactor.core.publisher.WorkQueueProcessor.builder()方法的一些代码示例,展示了WorkQueueProcessor.builder()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.builder()方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:builder

WorkQueueProcessor.builder介绍

[英]Create a new WorkQueueProcessor Builder with default properties.
[中]使用默认属性创建新的WorkQueueProcessor Builder。

代码示例

代码示例来源:origin: reactor/reactor-core

  1. /**
  2. * Create a new WorkQueueProcessor using {@link Queues#SMALL_BUFFER_SIZE} backlog size,
  3. * blockingWait Strategy and auto-cancel. <p> A new Cached ThreadExecutorPool will be
  4. * implicitly created.
  5. * @param <E> Type of processed signals
  6. * @return a fresh processor
  7. */
  8. public static <E> WorkQueueProcessor<E> create() {
  9. return WorkQueueProcessor.<E>builder().build();
  10. }

代码示例来源:origin: reactor/reactor-core

  1. @Override
  2. public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
  3. return WorkQueueProcessor.<Long>builder()
  4. .name("workQueueProcessorVerification")
  5. .bufferSize(bufferSize)
  6. .build();
  7. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideExecutor() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  5. .executor(executor)
  6. .build();
  7. assertProcessor(processor, false, null, null, null, null, executor, null);
  8. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideExecutorAutoCancel() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. boolean autoCancel = false;
  5. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  6. .executor(executor)
  7. .autoCancel(autoCancel)
  8. .build();
  9. assertProcessor(processor, false, null, null, null, autoCancel, executor, null);
  10. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideExecutorBufferSize() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. int bufferSize = 1024;
  5. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  6. .executor(executor)
  7. .bufferSize(bufferSize)
  8. .build();
  9. assertProcessor(processor, false, null, bufferSize, null, null, executor, null);
  10. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void shareOverrideExecutor() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  5. .share(true)
  6. .executor(executor)
  7. .build();
  8. assertProcessor(processor, true, null, null, null, null, executor, null);
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideAutoCancel() {
  3. boolean autoCancel = false;
  4. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  5. .autoCancel(autoCancel)
  6. .build();
  7. assertProcessor(processor, false, null, null, null, autoCancel, null, null);
  8. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideName() {
  3. String name = "nameOverride";
  4. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  5. .name(name)
  6. .build();
  7. assertProcessor(processor, false, name, null, null, null, null, null);
  8. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void shareOverrideExecutorAutoCancel() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. boolean autoCancel = false;
  5. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  6. .share(true)
  7. .executor(executor)
  8. .autoCancel(autoCancel)
  9. .build();
  10. assertProcessor(processor, true, null, null, null, autoCancel, executor, null);
  11. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void serializedSinkSingleProducer() throws Exception {
  3. WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
  4. .share(false).build();
  5. FluxSink<Integer> sink = queueProcessor.sink();
  6. Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
  7. sink = sink.next(1);
  8. Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
  9. sink = sink.onRequest(n -> {});
  10. Assertions.assertThat(sink).isInstanceOf(SerializedSink.class);
  11. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideExecutorBufferSizeAutoCancel() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. int bufferSize = 1024;
  5. boolean autoCancel = false;
  6. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  7. .executor(executor)
  8. .bufferSize(bufferSize)
  9. .autoCancel(autoCancel)
  10. .build();
  11. assertProcessor(processor, false, null, bufferSize, null, autoCancel, executor, null);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void shareOverrideExecutorBufferSize() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. int bufferSize = 1024;
  5. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  6. .share(true)
  7. .executor(executor)
  8. .bufferSize(bufferSize)
  9. .build();
  10. assertProcessor(processor, true, null, bufferSize, null, null, executor, null);
  11. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideExecutorBufferSizeWaitStrategy() {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. int bufferSize = 1024;
  5. WaitStrategy waitStrategy = WaitStrategy.busySpin();
  6. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  7. .executor(executor)
  8. .bufferSize(bufferSize)
  9. .waitStrategy(waitStrategy)
  10. .build();
  11. assertProcessor(processor, false, null, bufferSize, waitStrategy, null, executor, null);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void shareOverrideAutoCancel() {
  3. boolean autoCancel = false;
  4. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  5. .share(true)
  6. .autoCancel(autoCancel)
  7. .build();
  8. assertProcessor(processor, true, null, null, null, autoCancel, null, null);
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideNameBufferSizeAutoCancel() {
  3. String name = "nameOverride";
  4. int bufferSize = 1024;
  5. boolean autoCancel = false;
  6. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  7. .name(name)
  8. .bufferSize(bufferSize)
  9. .autoCancel(autoCancel)
  10. .build();
  11. assertProcessor(processor, false, name, bufferSize, null, autoCancel, null, null);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. private void setupPipeline() {
  2. processor = TopicProcessor.<String>builder().autoCancel(false).build();
  3. workProcessor = WorkQueueProcessor.<String>builder().autoCancel(false).build();
  4. processor.subscribe(workProcessor);
  5. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testWorkQueueProcessorGetters() {
  3. final int TEST_BUFFER_SIZE = 16;
  4. WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder().name("testProcessor").bufferSize(TEST_BUFFER_SIZE).build();
  5. assertEquals(TEST_BUFFER_SIZE, processor.getAvailableCapacity());
  6. processor.onNext(new Object());
  7. assertEquals(TEST_BUFFER_SIZE - 1, processor.getAvailableCapacity());
  8. processor.awaitAndShutdown();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void createOverrideNameBufferSizeWaitStrategy() {
  3. String name = "nameOverride";
  4. int bufferSize = 1024;
  5. WaitStrategy waitStrategy = WaitStrategy.busySpin();
  6. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  7. .name(name)
  8. .bufferSize(bufferSize)
  9. .waitStrategy(waitStrategy)
  10. .build();
  11. assertProcessor(processor, false, name, bufferSize, waitStrategy, null, null, null);
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void shareOverrideNameBufferSizeWaitStrategy() {
  3. String name = "nameOverride";
  4. int bufferSize = 1024;
  5. WaitStrategy waitStrategy = WaitStrategy.busySpin();
  6. WorkQueueProcessor<Integer> processor = WorkQueueProcessor.<Integer>builder()
  7. .share(true)
  8. .name(name)
  9. .bufferSize(bufferSize)
  10. .waitStrategy(waitStrategy)
  11. .build();
  12. assertProcessor(processor, true, name, bufferSize, waitStrategy, null, null, null);
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 15000L)
  2. public void disposeSubscribeNoThreadLeak() throws Exception {
  3. WorkQueueProcessor<String> wq = WorkQueueProcessor.<String>builder().autoCancel(false).build();
  4. Disposable d = wq.subscribe();
  5. d.dispose();
  6. d = wq.subscribe();
  7. d.dispose();
  8. d = wq.subscribe();
  9. d.dispose();
  10. while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
  11. }
  12. }

相关文章