我有一个SpringBootKafka监听器,它只听一个主题和消费者列表。现在这个spring启动应用程序现在有3个节点。现在我需要确保列表消息的数量均衡地分配到所有3个节点。如何在我的应用程序中实现这一点?
zc0qhyus1#
在consumer中,group.id(consumerconfig.group\u id\u config)config提供负载平衡。发布到主题的每条记录都会传递到每个订阅使用者组中的一个使用者示例。[组id][1]:标识此使用者所属的使用者组的唯一字符串。如果使用者使用subscribe(主题)的组管理功能或基于kafka的偏移管理策略,则需要此属性。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfiguration { private Map<String, Object> consumerConfigs() { final Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id"); return props; } @Bean public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() { final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false)); return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer); } @Bean public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(this.kafkaListenerConsumerFactory()); return factory; } } [1]: https://kafka.apache.org/documentation/#consumerconfigs_group.id
1条答案
按热度按时间zc0qhyus1#
在consumer中,group.id(consumerconfig.group\u id\u config)config提供负载平衡。发布到主题的每条记录都会传递到每个订阅使用者组中的一个使用者示例。
[组id][1]:
标识此使用者所属的使用者组的唯一字符串。如果使用者使用subscribe(主题)的组管理功能或基于kafka的偏移管理策略,则需要此属性。