streammessagelistenercontainer只生成一个线程

db2dz4w8  于 2021-07-23  发布在  Java
关注(0)|答案(0)|浏览(547)

我使用springdataredis订阅'task'redis流来处理任务。出于某种原因,redis stream consumer一次只生成一个线程并按顺序处理一条消息,即使我显式地提供了一个threadpool taskeexecutor。
我希望它将线程的创建委托给提供的线程池,并生成一个线程,达到线程池配置的限制。我可以看到它正在使用give taskexecutor,但是它不会产生多个线程。
即使我没有指定自己的taskexecutor,并且它在内部默认为simpleasynctaskexecutor,问题仍然存在。任务一次一个,一个接一个地按顺序处理,即使它们是持久的任务。
我错过了什么?

@Bean
public Subscription
      redisTaskStreamListenerContainer(
      RedisConnectionFactory connectionFactory,
      @Qualifier("task") RedisTemplate<String, Task<TransportEnvelope>> redisTemplate,
      @Qualifier("task") StreamListener<String, MapRecord<String, String, String>> listener,
      @Qualifier("task") Executor taskListenerExecutor) {

    StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
        containerOptions = StreamMessageListenerContainerOptions.builder()
        .pollTimeout(Duration.ofMillis(consumerPollTimeOutInMilli))
          .batchSize(consumerReadBatchSize)
          .executor(taskListenerExecutor)
          .build();

    StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
        StreamMessageListenerContainer.create(connectionFactory, containerOptions);

    StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readOptions
        =
        StreamMessageListenerContainer.StreamReadRequest
            .builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
            //turn off auto shutdown of stream consumer if an error occurs.
            .cancelOnError((ex) -> false)
            .consumer(Consumer.from(groupId, consumerId))
            .build();

    Subscription subscription = container.register(readOptions, listener);
    container.start();
    return subscription;
  }

  @Bean
  @Qualifier("task")
  public Executor redisListenerThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(30);
    threadPoolTaskExecutor.setMaxPoolSize(50);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
    threadPoolTaskExecutor.setThreadNamePrefix("redis-listener-");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return threadPoolTaskExecutor;
  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题