我使用springdataredis订阅'task'redis流来处理任务。出于某种原因,redis stream consumer一次只生成一个线程并按顺序处理一条消息,即使我显式地提供了一个threadpool taskeexecutor。
我希望它将线程的创建委托给提供的线程池,并生成一个线程,达到线程池配置的限制。我可以看到它正在使用give taskexecutor,但是它不会产生多个线程。
即使我没有指定自己的taskexecutor,并且它在内部默认为simpleasynctaskexecutor,问题仍然存在。任务一次一个,一个接一个地按顺序处理,即使它们是持久的任务。
我错过了什么?
@Bean
public Subscription
redisTaskStreamListenerContainer(
RedisConnectionFactory connectionFactory,
@Qualifier("task") RedisTemplate<String, Task<TransportEnvelope>> redisTemplate,
@Qualifier("task") StreamListener<String, MapRecord<String, String, String>> listener,
@Qualifier("task") Executor taskListenerExecutor) {
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
containerOptions = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(consumerPollTimeOutInMilli))
.batchSize(consumerReadBatchSize)
.executor(taskListenerExecutor)
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, containerOptions);
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readOptions
=
StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
//turn off auto shutdown of stream consumer if an error occurs.
.cancelOnError((ex) -> false)
.consumer(Consumer.from(groupId, consumerId))
.build();
Subscription subscription = container.register(readOptions, listener);
container.start();
return subscription;
}
@Bean
@Qualifier("task")
public Executor redisListenerThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(30);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);
threadPoolTaskExecutor.setThreadNamePrefix("redis-listener-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
暂无答案!
目前还没有任何答案,快来回答吧!