在我的spring boot/kafka项目中,我有以下使用者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
}
这是我的 PostConsumer
:
@Component
public class PostConsumer {
@Autowired
private PostService postService;
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
以及application.properties:
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error
如您所见,我添加了factory.setconcurrency(10);但它不起作用。所有的 PostConsumer.sendPost
在名为的同一线程中执行 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1
我希望能够控制并发的 PostConsumer.sendPost
为了让听众并行工作。请告诉我如何可以实现与Spring Boot 和 Spring Kafka。
2条答案
按热度按时间xdnvmnnf1#
要创建和管理分区主题,
要将消息发送到不同的分区,请使用partitioner接口
使用单个使用者使用来自多个分区的消息(来自不同分区的每条消息将生成新线程,并并行调用使用者方法)
在这里,使用者(如果启动了多个示例)属于同一个组,因此每个消息将调用其中一个使用者。
pjngdqdw2#
问题在于我们在SpringKafka中使用ApacheKafka消费者追求的一致性。在提供的主题中,这种并发性分布在分区之间。如果只有一个主题和一个分区,那么就不会有任何并发性。关键是使用同一线程中一个分区的所有记录。
文件中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.release/reference/html/_reference.html#_concurrentmessagelistenercontainer
假设提供了6个主题分区,并发性为3;每个容器将得到2个分区。对于5个主题分区,2个容器将获得2个分区,第3个容器将获得1个分区。如果并发性大于topicpartitions的数量,那么并发性将被向下调整,这样每个容器将获得一个分区。
还有javadocs: