reactor.core.publisher.WorkQueueProcessor.onNext()方法的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中reactor.core.publisher.WorkQueueProcessor.onNext()方法的一些代码示例,展示了WorkQueueProcessor.onNext()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WorkQueueProcessor.onNext()方法的具体详情如下:
包路径:reactor.core.publisher.WorkQueueProcessor
类名称:WorkQueueProcessor
方法名:onNext

WorkQueueProcessor.onNext介绍

暂无

代码示例

代码示例来源:origin: reactor/reactor-core

  1. @Test(timeout = 4000)
  2. public void singleThreadWorkQueueSucceedsWithOneSubscriber() {
  3. ExecutorService executorService = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
  5. CountDownLatch latch = new CountDownLatch(1);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. bc.subscribe(spec1);
  8. bc.onNext("foo");
  9. bc.onNext("bar");
  10. Executors.newSingleThreadScheduledExecutor()
  11. .schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
  12. bc.onNext("baz");
  13. try {
  14. latch.await(800, TimeUnit.MILLISECONDS);
  15. }
  16. catch (InterruptedException e1) {
  17. fail(e1.toString());
  18. }
  19. assertNull(spec1.error);
  20. }

代码示例来源:origin: reactor/reactor-core

  1. @Test()
  2. public void retryNoThreadLeak() throws Exception {
  3. WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
  4. wq.handle((integer, sink) -> sink.error(new RuntimeException()))
  5. .retry(10)
  6. .subscribe();
  7. wq.onNext(1);
  8. wq.onNext(2);
  9. wq.onNext(3);
  10. wq.onComplete();
  11. while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
  12. }
  13. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void singleThreadWorkQueueDoesntRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newSingleThreadExecutor();
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(2).build();
  5. CountDownLatch latch = new CountDownLatch(1);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. bc.subscribe(spec1);
  9. bc.subscribe(spec2);
  10. bc.onNext("foo");
  11. bc.onNext("bar");
  12. Executors.newSingleThreadScheduledExecutor()
  13. .schedule(bc::onComplete, 200, TimeUnit.MILLISECONDS);
  14. try {
  15. bc.onNext("baz");
  16. fail("expected 3rd next to time out as newSingleThreadExecutor cannot be introspected");
  17. }
  18. catch (Throwable e) {
  19. assertTrue("expected AlertException, got " + e, WaitStrategy.isAlert(e));
  20. }
  21. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void retryErrorPropagatedFromWorkQueueSubscriberCold() throws Exception {
  3. AtomicInteger errors = new AtomicInteger(3);
  4. WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
  5. AtomicInteger onNextSignals = new AtomicInteger();
  6. wq.onNext(1);
  7. wq.onNext(2);
  8. wq.onNext(3);
  9. wq.onComplete();
  10. StepVerifier.create(wq.log("wq", Level.FINE)
  11. .doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
  12. (s1, sink) -> {
  13. if (errors.decrementAndGet() > 0) {
  14. sink.error(new RuntimeException());
  15. }
  16. else {
  17. sink.next(s1);
  18. }
  19. }).retry())
  20. .expectNext(1, 2, 3)
  21. .verifyComplete();
  22. assertThat(onNextSignals.get(), equalTo(5));
  23. while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
  24. }
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void retryErrorPropagatedFromWorkQueueSubscriberHot() throws Exception {
  3. AtomicInteger errors = new AtomicInteger(3);
  4. WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
  5. AtomicInteger onNextSignals = new AtomicInteger();
  6. StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
  7. (s1, sink) -> {
  8. if (errors.decrementAndGet() > 0) {
  9. sink.error(new RuntimeException("expected"));
  10. }
  11. else {
  12. sink.next(s1);
  13. }
  14. }).retry())
  15. .then(() -> {
  16. wq.onNext(1);
  17. wq.onNext(2);
  18. wq.onNext(3);
  19. })
  20. .expectNext(3)
  21. .thenCancel()
  22. .verify();
  23. assertThat(onNextSignals.get(), equalTo(3));
  24. while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
  25. }
  26. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void fixedThreadPoolWorkQueueRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newFixedThreadPool(2);
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
  5. CountDownLatch latch = new CountDownLatch(3);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
  9. bc.subscribe(spec1);
  10. bc.subscribe(spec2);
  11. bc.subscribe(spec3);
  12. bc.onNext("foo");
  13. bc.onComplete();
  14. assertThat(spec1.error, is(nullValue()));
  15. assertThat(spec2.error, is(nullValue()));
  16. assertThat(spec3.error, is(notNullValue()));
  17. assertThat(spec3.error.getMessage(),
  18. startsWith(
  19. "The executor service could not accommodate another subscriber, detected limit 2"));
  20. try {
  21. latch.await(1, TimeUnit.SECONDS);
  22. }
  23. catch (InterruptedException e1) {
  24. fail(e1.toString());
  25. }
  26. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void forkJoinPoolWorkQueueRejectsSubscribers() {
  3. ExecutorService executorService = Executors.newWorkStealingPool(2);
  4. WorkQueueProcessor<String> bc = WorkQueueProcessor.<String>builder().executor(executorService).bufferSize(16).build();
  5. CountDownLatch latch = new CountDownLatch(2);
  6. TestWorkQueueSubscriber spec1 = new TestWorkQueueSubscriber(latch, "spec1");
  7. TestWorkQueueSubscriber spec2 = new TestWorkQueueSubscriber(latch, "spec2");
  8. TestWorkQueueSubscriber spec3 = new TestWorkQueueSubscriber(latch, "spec3");
  9. bc.subscribe(spec1);
  10. bc.subscribe(spec2);
  11. bc.subscribe(spec3);
  12. bc.onNext("foo");
  13. bc.onComplete();
  14. assertThat(spec1.error, is(nullValue()));
  15. assertThat(spec2.error, is(nullValue()));
  16. assertThat(spec3.error, is(notNullValue()));
  17. assertThat(spec3.error.getMessage(),
  18. is("The executor service could not accommodate another subscriber, detected limit 2"));
  19. try {
  20. latch.await(1, TimeUnit.SECONDS);
  21. }
  22. catch (InterruptedException e1) {
  23. fail(e1.toString());
  24. }
  25. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void retryErrorPropagatedFromWorkQueueSubscriberHotPoisonSignal()
  3. throws Exception {
  4. WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
  5. AtomicInteger onNextSignals = new AtomicInteger();
  6. StepVerifier.create(wq.doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
  7. (s1, sink) -> {
  8. if (s1 == 1) {
  9. sink.error(new RuntimeException());
  10. }
  11. else {
  12. sink.next(s1);
  13. }
  14. }).retry())
  15. .then(() -> {
  16. wq.onNext(1);
  17. wq.onNext(2);
  18. wq.onNext(3);
  19. })
  20. .expectNext(2, 3)
  21. .thenCancel()
  22. .verify();
  23. assertThat(onNextSignals.get(), equalTo(3));
  24. while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
  25. }
  26. }

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void retryErrorPropagatedFromWorkQueueSubscriberHotPoisonSignal2()
  3. throws Exception {
  4. WorkQueueProcessor<Integer> wq = WorkQueueProcessor.<Integer>builder().autoCancel(false).build();
  5. AtomicInteger onNextSignals = new AtomicInteger();
  6. StepVerifier.create(wq.log()
  7. .doOnNext(e -> onNextSignals.incrementAndGet()).<Integer>handle(
  8. (s1, sink) -> {
  9. if (s1 == 2) {
  10. sink.error(new RuntimeException());
  11. }
  12. else {
  13. sink.next(s1);
  14. }
  15. }).retry())
  16. .then(() -> {
  17. wq.onNext(1);
  18. wq.onNext(2);
  19. wq.onNext(3);
  20. })
  21. .expectNext(1, 3)
  22. .thenCancel()
  23. .verify();
  24. assertThat(onNextSignals.get(), equalTo(3));
  25. while (wq.downstreamCount() != 0 && Thread.activeCount() > 1) {
  26. }
  27. }

代码示例来源:origin: reactor/reactor-core

  1. .retry())
  2. .then(() -> {
  3. wq.onNext(1);
  4. wq.onNext(2);
  5. wq.onNext(3);
  6. })
  7. .expectNext(3)

代码示例来源:origin: reactor/reactor-core

  1. .retry())
  2. .then(() -> {
  3. wq.onNext(1);
  4. wq.onNext(2);
  5. wq.onNext(3);
  6. })
  7. .expectNext(3)

代码示例来源:origin: reactor/reactor-core

  1. @Test
  2. public void testWorkQueueProcessorGetters() {
  3. final int TEST_BUFFER_SIZE = 16;
  4. WorkQueueProcessor<Object> processor = WorkQueueProcessor.builder().name("testProcessor").bufferSize(TEST_BUFFER_SIZE).build();
  5. assertEquals(TEST_BUFFER_SIZE, processor.getAvailableCapacity());
  6. processor.onNext(new Object());
  7. assertEquals(TEST_BUFFER_SIZE - 1, processor.getAvailableCapacity());
  8. processor.awaitAndShutdown();
  9. }

代码示例来源:origin: reactor/reactor-core

  1. wq.onNext(1);
  2. wq.onNext(2);
  3. wq.onNext(3);
  4. })
  5. .expectNext(2, 3)

代码示例来源:origin: reactor/reactor-core

  1. .retry())
  2. .then(() -> {
  3. wq.onNext(1);
  4. wq.onNext(2);
  5. wq.onNext(3);
  6. })
  7. .expectNextMatches(d -> d == 2 || d == 3)

代码示例来源:origin: reactor/reactor-core

  1. wq.onNext(1);
  2. wq.onNext(2);
  3. wq.onNext(3);
  4. })
  5. .expectNext(2, 3)

代码示例来源:origin: reactor/reactor-core

  1. .retry())
  2. .then(() -> {
  3. wq.onNext(1);
  4. wq.onNext(2);
  5. wq.onNext(3);
  6. })
  7. .expectNextMatches(d -> d == 2 || d == 3)

代码示例来源:origin: reactor/reactor-core

  1. .retry())
  2. .then(() -> {
  3. wq.onNext(1);
  4. wq.onNext(2);
  5. wq.onNext(3);
  6. })
  7. .expectNextMatches(d -> d == 2 || d == 3)

代码示例来源:origin: io.projectreactor.addons/reactor-logback

  1. protected void queueLoggingEvent(ILoggingEvent evt) {
  2. if (null != delegate.get()) {
  3. processor.onNext(evt);
  4. }
  5. }

相关文章