spring boot kafka消息消费者和丢失消息

pepwfjgg  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(525)

我使用SpringBoot2.0.2和SpringKafka。我还使用了以下存储库中的kafka docker image 1.1.0:https://hub.docker.com/r/wurstmeister/kafka/tags/
这是我的Kafka配置:

@Configuration
@EnableKafka
public class KafkaConfig {
}

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory());

        return factory;
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Post> postProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Post> postKafkaTemplate() {
        return new KafkaTemplate<>(postProducerFactory());
    }

}

这是Kafka application.properties :


# Kafka

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
kafka.topic.posts.create=posts.create

这是我的消息侦听器:

@Component
public class PostConsumer {

    static final Logger logger = LoggerFactory.getLogger(PostConsumer.class);

    @KafkaListener(topics = "${kafka.topic.posts.create}", containerFactory = "postKafkaListenerContainerFactory")
    public void createPost(ConsumerRecord<String, Post> consumerRecord) {

        Post post = consumerRecord.value();

        logger.info("Received message for post creation: {}", post);
    }

}

我还实施了 PostService 哪个应该发送 Post Kafka主题:

@Service
public class PostServiceImpl implements PostService {

    static final Logger logger = LoggerFactory.getLogger(PostServiceImpl.class);

    @Value("${kafka.topic.posts.create}")
    private String kafkaTopicPostsCreate;

    @Autowired
    private KafkaTemplate<String, Post> postKafkaTemplate;

    @Override
    public void sendPost(Post post) {

        postKafkaTemplate.send(kafkaTopicPostsCreate, post);

        logger.info("Message sent to the post creation queue: {}", post);
    }

}

我还实现了springboot测试:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { TestApplication.class })
public class PostServiceIT {

    @Autowired
    private PostService postService;

    @Autowired
    private MessageRepository messageRepository;

    @Before
    public void setUp() {
        messageRepository.deleteAll();
    }

    @Test
    public void testCreatePost() throws InterruptedException {

        assertEquals(0, messageRepository.findAll().size());

        Post post = new Post();

        ...

        postService.sendPost(post);

        await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);
    }

}

这是日志:

2018-06-09 16:12:37.547  INFO 17824 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'schedulerFactoryBean' initialized from an externally provided properties instance.
2018-06-09 16:12:37.547  INFO 17824 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.0
2018-06-09 16:12:37.548  INFO 17824 --- [           main] org.quartz.core.QuartzScheduler          : JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@7a3e5cd3
2018-06-09 16:12:38.967  INFO 17824 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483547
2018-06-09 16:12:38.997  INFO 17824 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [127.0.0.1:9093]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = postfenix
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 600000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2018-06-09 16:12:39.095  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.1.0
2018-06-09 16:12:39.095  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.100  INFO 17824 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2018-06-09 16:12:39.104  INFO 17824 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-06-09 16:12:39.104  INFO 17824 --- [           main] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now
2018-06-09 16:12:39.104  INFO 17824 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_NON_CLUSTERED started.
2018-06-09 16:12:39.111  INFO 17824 --- [SchedulerThread] c.n.quartz.mongodb.dao.TriggerDao        : Found 0 triggers which are eligible to be run.
2018-06-09 16:12:39.119  INFO 17824 --- [           main] com.postfenix.domain.post.PostServiceIT  : Started PostServiceIT in 5.094 seconds (JVM running for 5.74)
2018-06-09 16:12:39.121  INFO 17824 --- [           main] c.p.d.configuration.TestApplication      : Initializing application...
2018-06-09 16:12:39.258  INFO 17824 --- [           main] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27018
2018-06-09 16:12:39.338  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 2 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.339  INFO 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.392  INFO 17824 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9093]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 15000000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2018-06-09 16:12:39.419  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.1.0
2018-06-09 16:12:39.419  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.437  WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.437  INFO 17824 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.454  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 4 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.565  WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.590  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 6 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.704  INFO 17824 --- [           main] c.p.domain.service.post.PostServiceImpl  : Message sent to the post creation queue: Post [chatId=@name, parseMode=HTML]
2018-06-09 16:12:40.229  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Discovered group coordinator 10.0.75.1:9093 (id: 2147482646 rack: null)
2018-06-09 16:12:40.232  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Revoking previously assigned partitions []
2018-06-09 16:12:40.233  INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2018-06-09 16:12:40.233  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] (Re-)joining group
2018-06-09 16:12:40.295  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Successfully joined group with generation 1
2018-06-09 16:12:40.297  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Setting newly assigned partitions [posts.create-0]
2018-06-09 16:12:40.313  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-1, groupId=postfenix] Resetting offset for partition posts.create-0 to offset 1.
2018-06-09 16:12:40.315  INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [posts.create-0]

现在我的测试在以下几行失败了:

await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);

因为在第一次测试运行之后,由于某种原因消息没有传递到 PostConsumer.createPost 方法。但是,如果我在同一个kafka docker示例上再次运行相同的测试,那么来自上一次测试运行的消息将成功地传递到 PostConsumer.createPost . 我做错了什么?为什么在第一次测试运行后没有传递消息?如何修复它?
更新
这是我的最新消息 KafkaConsumerConfig :

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory());

        return factory;
    }

}

现在我有两个编译错误 kafkaListenerContainerFactory 以及 postConsumerFactory 方法因为 consumerConfigs() 方法不存在且 consumerFactory 中的方法 kafkaListenerContainerFactory 要求 KafkaProperties .

ig9co6j1

ig9co6j11#

spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.group-id=postfenix 您没有使用这些引导属性,因为您正在创建自己的使用者配置。
你应该换掉这个

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}

具有

@Bean
public ConsumerFactory<String, String> consumerFactory(
         KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(),
         new StringDeserializer(), new JsonDeserializer<>(String.class));
}

编辑

@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(kafkaProperties));

    return factory;
}

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

相关问题