Spring Kafka与主题消费者数量

esyap4oy  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(511)

在我的spring boot/kafka项目中,我有以下使用者配置:

  1. @Configuration
  2. public class KafkaConsumerConfig {
  3. @Bean
  4. public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
  5. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
  6. }
  7. @Bean
  8. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
  9. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  10. factory.setConsumerFactory(consumerFactory(kafkaProperties));
  11. factory.setConcurrency(10);
  12. return factory;
  13. }
  14. @Bean
  15. public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
  16. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
  17. }
  18. @Bean
  19. public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
  20. ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
  21. factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
  22. return factory;
  23. }
  24. }

这是我的 PostConsumer :

  1. @Component
  2. public class PostConsumer {
  3. @Autowired
  4. private PostService postService;
  5. @KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
  6. public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
  7. postService.sendPost(consumerRecord.value());
  8. }
  9. }

以及application.properties:

  1. spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
  2. spring.kafka.consumer.auto-offset-reset=earliest
  3. spring.kafka.consumer.group-id=groupname
  4. spring.kafka.consumer.enable-auto-commit=false
  5. kafka.topic.post.send=post.send
  6. kafka.topic.post.sent=post.sent
  7. kafka.topic.post.error=post.error

如您所见,我添加了factory.setconcurrency(10);但它不起作用。所有的 PostConsumer.sendPost 在名为的同一线程中执行 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1 我希望能够控制并发的 PostConsumer.sendPost 为了让听众并行工作。请告诉我如何可以实现与Spring Boot 和 Spring Kafka。

xdnvmnnf

xdnvmnnf1#

要创建和管理分区主题,

  1. @Bean
  2. public KafkaAdmin admin() {
  3. Map<String, Object> configs = new HashMap<>();
  4. configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
  5. return new KafkaAdmin(configs);
  6. }
  7. @Bean
  8. public NewTopic topicToTarget() {
  9. return new NewTopic(Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME, <no. of partitions>, (short) <replication factor>);
  10. }

要将消息发送到不同的分区,请使用partitioner接口

  1. @Bean
  2. public Map<String, Object> producerConfigs() {
  3. Map<String, Object> props = new HashMap<>();
  4. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
  5. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  6. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  7. props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, <your custom Partitioner implementation>);
  8. return props;
  9. }

使用单个使用者使用来自多个分区的消息(来自不同分区的每条消息将生成新线程,并并行调用使用者方法)

  1. @KafkaListener(topicPartitions = {
  2. @TopicPartition(
  3. topic = Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME,
  4. partitions = "#{kafkaGateway.createPartitionArray()}"
  5. )
  6. }, groupId = "group.processor")
  7. public void consumeWriteRequest(@Payload String data) {
  8. //your code
  9. }

在这里,使用者(如果启动了多个示例)属于同一个组,因此每个消息将调用其中一个使用者。

展开查看全部
pjngdqdw

pjngdqdw2#

问题在于我们在SpringKafka中使用ApacheKafka消费者追求的一致性。在提供的主题中,这种并发性分布在分区之间。如果只有一个主题和一个分区,那么就不会有任何并发性。关键是使用同一线程中一个分区的所有记录。
文件中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.release/reference/html/_reference.html#_concurrentmessagelistenercontainer
假设提供了6个主题分区,并发性为3;每个容器将得到2个分区。对于5个主题分区,2个容器将获得2个分区,第3个容器将获得1个分区。如果并发性大于topicpartitions的数量,那么并发性将被向下调整,这样每个容器将获得一个分区。
还有javadocs:

  1. /**
  2. * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
  3. * Messages from within the same partition will be processed sequentially.
  4. * @param concurrency the concurrency.
  5. */
  6. public void setConcurrency(int concurrency) {

相关问题