Spring Kafka与主题消费者数量

esyap4oy  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(474)

在我的spring boot/kafka项目中,我有以下使用者配置:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setConcurrency(10);    
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

        return factory;
    }

}

这是我的 PostConsumer :

@Component
public class PostConsumer {

    @Autowired
    private PostService postService;

    @KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

        postService.sendPost(consumerRecord.value());

    }

}

以及application.properties:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error

如您所见,我添加了factory.setconcurrency(10);但它不起作用。所有的 PostConsumer.sendPost 在名为的同一线程中执行 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1 我希望能够控制并发的 PostConsumer.sendPost 为了让听众并行工作。请告诉我如何可以实现与Spring Boot 和 Spring Kafka。

xdnvmnnf

xdnvmnnf1#

要创建和管理分区主题,

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topicToTarget() {
    return new NewTopic(Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME, <no. of partitions>, (short) <replication factor>);
}

要将消息发送到不同的分区,请使用partitioner接口

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, <your custom Partitioner implementation>);

    return props;
}

使用单个使用者使用来自多个分区的消息(来自不同分区的每条消息将生成新线程,并并行调用使用者方法)

@KafkaListener(topicPartitions = {
        @TopicPartition(
                topic = Constant.Topic.PUBLISH_MESSAGE_TOPIC_NAME,
                partitions = "#{kafkaGateway.createPartitionArray()}"
        )
}, groupId = "group.processor")
public void consumeWriteRequest(@Payload String data) {
  //your code
}

在这里,使用者(如果启动了多个示例)属于同一个组,因此每个消息将调用其中一个使用者。

pjngdqdw

pjngdqdw2#

问题在于我们在SpringKafka中使用ApacheKafka消费者追求的一致性。在提供的主题中,这种并发性分布在分区之间。如果只有一个主题和一个分区,那么就不会有任何并发性。关键是使用同一线程中一个分区的所有记录。
文件中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.release/reference/html/_reference.html#_concurrentmessagelistenercontainer
假设提供了6个主题分区,并发性为3;每个容器将得到2个分区。对于5个主题分区,2个容器将获得2个分区,第3个容器将获得1个分区。如果并发性大于topicpartitions的数量,那么并发性将被向下调整,这样每个容器将获得一个分区。
还有javadocs:

/**
 * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 * Messages from within the same partition will be processed sequentially.
 * @param concurrency the concurrency.
 */
public void setConcurrency(int concurrency) {

相关问题