streammessagelistenercontainer只生成一个线程

db2dz4w8  于 2021-07-23  发布在  Java
关注(0)|答案(0)|浏览(624)

我使用springdataredis订阅'task'redis流来处理任务。出于某种原因,redis stream consumer一次只生成一个线程并按顺序处理一条消息,即使我显式地提供了一个threadpool taskeexecutor。
我希望它将线程的创建委托给提供的线程池,并生成一个线程,达到线程池配置的限制。我可以看到它正在使用give taskexecutor,但是它不会产生多个线程。
即使我没有指定自己的taskexecutor,并且它在内部默认为simpleasynctaskexecutor,问题仍然存在。任务一次一个,一个接一个地按顺序处理,即使它们是持久的任务。
我错过了什么?

  1. @Bean
  2. public Subscription
  3. redisTaskStreamListenerContainer(
  4. RedisConnectionFactory connectionFactory,
  5. @Qualifier("task") RedisTemplate<String, Task<TransportEnvelope>> redisTemplate,
  6. @Qualifier("task") StreamListener<String, MapRecord<String, String, String>> listener,
  7. @Qualifier("task") Executor taskListenerExecutor) {
  8. StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
  9. containerOptions = StreamMessageListenerContainerOptions.builder()
  10. .pollTimeout(Duration.ofMillis(consumerPollTimeOutInMilli))
  11. .batchSize(consumerReadBatchSize)
  12. .executor(taskListenerExecutor)
  13. .build();
  14. StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
  15. StreamMessageListenerContainer.create(connectionFactory, containerOptions);
  16. StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readOptions
  17. =
  18. StreamMessageListenerContainer.StreamReadRequest
  19. .builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
  20. //turn off auto shutdown of stream consumer if an error occurs.
  21. .cancelOnError((ex) -> false)
  22. .consumer(Consumer.from(groupId, consumerId))
  23. .build();
  24. Subscription subscription = container.register(readOptions, listener);
  25. container.start();
  26. return subscription;
  27. }
  28. @Bean
  29. @Qualifier("task")
  30. public Executor redisListenerThreadPoolTaskExecutor() {
  31. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  32. threadPoolTaskExecutor.setCorePoolSize(30);
  33. threadPoolTaskExecutor.setMaxPoolSize(50);
  34. threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
  35. threadPoolTaskExecutor.setThreadNamePrefix("redis-listener-");
  36. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  37. return threadPoolTaskExecutor;
  38. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题