reactor.core.publisher.WorkQueueProcessor.subscribe()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(200)

本文整理了Java中reactor.core.publisher.WorkQueueProcessor.subscribe()方法的一些代码示例,展示了WorkQueueProcessor.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.subscribe()方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:subscribe

WorkQueueProcessor.subscribe介绍

暂无

代码示例

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 15000L)
  2. public void disposeSubscribeNoThreadLeak() throws Exception {
  3. WorkQueueProcessor<String> wq = WorkQueueProcessor.<String>builder().autoCancel(false).build();
  4. Disposable d = wq.subscribe();
  5. d.dispose();
  6. d = wq.subscribe();
  7. d.dispose();
  8. d = wq.subscribe();
  9. d.dispose();
  10. while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
  11. }
  12. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void fixedThreadPoolWorkQueueRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newFixedThreadPool(2);
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
  5. CountDownLatch latch = new CountDownLatch(3);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
  9. bc.subscribe(spec1);
  10. bc.subscribe(spec2);
  11. bc.subscribe(spec3);
  12. bc.onNext("foo");
  13. bc.onComplete();
  14. assertThat(spec1.error, is(nullValue()));
  15. assertThat(spec2.error, is(nullValue()));
  16. assertThat(spec3.error, is(notNullValue()));
  17. assertThat(spec3.error.getMessage(),
  18. startsWith(
  19. "The executor service could not accommodate another subscriber, detected limit 2"));
  20. try {
  21. latch.await(1, TimeUnit.SECONDS);
  22. }
  23. catch (InterruptedException e1) {
  24. fail(e1.toString());
  25. }
  26. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void forkJoinPoolWorkQueueRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newWorkStealingPool(2);
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
  5. CountDownLatch latch = new CountDownLatch(2);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
  9. bc.subscribe(spec1);
  10. bc.subscribe(spec2);
  11. bc.subscribe(spec3);
  12. bc.onNext("foo");
  13. bc.onComplete();
  14. assertThat(spec1.error, is(nullValue()));
  15. assertThat(spec2.error, is(nullValue()));
  16. assertThat(spec3.error, is(notNullValue()));
  17. assertThat(spec3.error.getMessage(),
  18. is("The executor service could not accommodate another subscriber, detected limit 2"));
  19. try {
  20. latch.await(1, TimeUnit.SECONDS);
  21. }
  22. catch (InterruptedException e1) {
  23. fail(e1.toString());
  24. }
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void chainedWorkQueueProcessor() throws Exception{
  3. ExecutorService es = Executors.newFixedThreadPool(2);
  4. try {
  5. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(es).bufferSize(16).build();
  6. int elems = 18;
  7. CountDownLatch latch = new CountDownLatch(elems);
  8. bc.subscribe(TopicProcessorTest.sub("spec1", latch));
  9. Flux.range(0, elems)
  10. .map(s -> "hello " + s)
  11. .subscribe(bc);
  12. assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
  13. }
  14. finally {
  15. es.shutdown();
  16. }
  17. }

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 4000)
  2. public void singleThreadWorkQueueSucceedsWithOneSubscriber() {
  3. ExecutorService executorService = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
  5. CountDownLatch latch = new CountDownLatch(1);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. bc.subscribe(spec1);
  8. bc.onNext("foo");
  9. bc.onNext("bar");
  10. Executors.newSingleThreadScheduledExecutor()
  11. .schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
  12. bc.onNext("baz");
  13. try {
  14. latch.await(800, TimeUnit.MILLISECONDS);
  15. }
  16. catch (InterruptedException e1) {
  17. fail(e1.toString());
  18. }
  19. assertNull(spec1.error);
  20. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void singleThreadWorkQueueDoesntRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
  5. CountDownLatch latch = new CountDownLatch(1);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. bc.subscribe(spec1);
  9. bc.subscribe(spec2);
  10. bc.onNext("foo");
  11. bc.onNext("bar");
  12. Executors.newSingleThreadScheduledExecutor()
  13. .schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
  14. try {
  15. bc.onNext("baz");
  16. fail("expected 3rd next to time out as newSingleThreadExecutor cannot be introspected");
  17. }
  18. catch (Throwable e) {
  19. assertTrue("expected AlertException, got " + e, WaitStrategy.isAlert(e));
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testCustomRequestTaskThreadNameCreate() {
  3. String expectedName = "workQueueProcessorRequestTaskCreate";
  4. //NOTE: the below single executor should not be used usually as requestTask assumes it immediately gets executed
  5. ExecutorService customTaskExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, expectedName));
  6. WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder()
  7. .executor(Executors.newCachedThreadPool())
  8. .requestTaskExecutor(customTaskExecutor)
  9. .bufferSize(8)
  10. .waitStrategy(WaitStrategy.liteBlocking())
  11. .autoCancel(true)
  12. .build();
  13. processor.requestTask(Operators.cancelledSubscription());
  14. processor.subscribe();
  15. Thread[] threads = new Thread[Thread.activeCount()];
  16. Thread.enumerate(threads);
  17. //cleanup to avoid visibility in other tests
  18. customTaskExecutor.shutdownNow();
  19. processor.forceShutdown();
  20. Condition<Thread> customRequestTaskThread = new Condition<>(
  21. thread -> thread != null && expectedName.equals(thread.getName()),
  22. "a thread named \"%s\"", expectedName);
  23. Assertions.assertThat(threads)
  24. .haveExactly(1, customRequestTaskThread);
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void serializedSinkMultiProducerWithOnRequest() throws Exception {
  3. int count = 1000;
  4. WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
  5. .share(true)
  6. .build();
  7. TestSubscriber subscriber = new TestSubscriber(count);
  8. queueProcessor.subscribe(subscriber);
  9. FluxSink<Integer> sink = queueProcessor.sink();
  10. AtomicInteger next = new AtomicInteger();
  11. FluxSink<Integer> serializedSink = sink.onRequest(n -> {
  12. for (int i = 0; i < n; i++) {
  13. synchronized (s) { // to ensure that elements are in order for testing
  14. FluxSink<Integer> retSink = sink.next(next.getAndIncrement());
  15. Assertions.assertThat(retSink).isInstanceOf(SerializedSink.class);
  16. }
  17. }
  18. });
  19. Assertions.assertThat(serializedSink).isInstanceOf(SerializedSink.class);
  20. subscriber.await(Duration.ofSeconds(5));
  21. sink.complete();
  22. assertNull("Unexpected exception in subscriber", subscriber.failure);
  23. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testCustomRequestTaskThreadNameShare() {
  3. String expectedName = "workQueueProcessorRequestTaskShare";
  4. //NOTE: the below single executor should not be used usually as requestTask assumes it immediately gets executed
  5. ExecutorService customTaskExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, expectedName));
  6. WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder()
  7. .executor(Executors.newCachedThreadPool())
  8. .requestTaskExecutor(customTaskExecutor)
  9. .bufferSize(8)
  10. .waitStrategy(WaitStrategy.liteBlocking())
  11. .autoCancel(true)
  12. .build();
  13. processor.requestTask(Operators.cancelledSubscription());
  14. processor.subscribe();
  15. Thread[] threads = new Thread[Thread.activeCount()];
  16. Thread.enumerate(threads);
  17. //cleanup to avoid visibility in other tests
  18. customTaskExecutor.shutdownNow();
  19. processor.forceShutdown();
  20. Condition<Thread> customRequestTaskThread = new Condition<>(
  21. thread -> thread != null && expectedName.equals(thread.getName()),
  22. "a thread named \"%s\"", expectedName);
  23. Assertions.assertThat(threads)
  24. .haveExactly(1, customRequestTaskThread);
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void nonSerializedSinkMultiProducer() throws Exception {
  3. int count = 1000;
  4. WorkQueueProcessor<Integer> queueProcessor = WorkQueueProcessor.<Integer>builder()
  5. .share(true)
  6. .build();
  7. TestSubscriber subscriber = new TestSubscriber(count);
  8. queueProcessor.subscribe(subscriber);
  9. FluxSink<Integer> sink = queueProcessor.sink();
  10. Assertions.assertThat(sink).isNotInstanceOf(SerializedSink.class);
  11. for (int i = 0; i < count; i++) {
  12. sink = sink.next(i);
  13. Assertions.assertThat(sink).isNotInstanceOf(SerializedSink.class);
  14. }
  15. subscriber.await(Duration.ofSeconds(5));
  16. assertNull("Unexpected exception in subscriber", subscriber.failure);
  17. }

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 15000L)
  2. public void completeDoesNotHang() throws Exception {
  3. WorkQueueProcessor<String> wq = WorkQueueProcessor.create();
  4. wq.subscribe();
  5. Assert.assertTrue(wq.downstreamCount() == 1);
  6. wq.onComplete();
  7. while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
  8. }
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 15000L)
  2. public void cancelDoesNotHang() throws Exception {
  3. WorkQueueProcessor<String> wq = WorkQueueProcessor.create();
  4. Disposable d = wq.subscribe();
  5. Assert.assertTrue(wq.downstreamCount() == 1);
  6. d.dispose();
  7. while (wq.downstreamCount() != 0 && Thread.activeCount() > 2) {
  8. }
  9. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testForceShutdownAfterShutdown() throws InterruptedException {
  3. WorkQueueProcessor<String> processor = WorkQueueProcessor.<String>builder()
  4. .name("processor").bufferSize(4)
  5. .waitStrategy(WaitStrategy.phasedOffLiteLock(200, 100, TimeUnit.MILLISECONDS)) //eliminate the waitstrategy diff
  6. .build();
  7. Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
  8. publisher.subscribe(processor);
  9. AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
  10. processor.subscribe(subscriber);
  11. subscriber.request(1);
  12. Thread.sleep(250);
  13. processor.shutdown();
  14. assertFalse(processor.awaitAndShutdown(Duration.ofMillis(400)));
  15. processor.forceShutdown();
  16. assertTrue(processor.awaitAndShutdown(Duration.ofMillis(400)));
  17. }

代码示例来源:origin: io.projectreactor.addons/reactor-logback

  1. @Override
  2. public void start() {
  3. startDelegateAppender();
  4. processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger")
  5. .bufferSize(backlog)
  6. .autoCancel(false)
  7. .build();
  8. processor.subscribe(this);
  9. }

相关文章