kafka消费者不返回任何记录

xkftehaa  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(412)

我正试着和Kafka搞个小游戏。但是,在java中创建使用者时,该使用者不会得到任何消息。即使当我使用相同的url/主题启动kafka-console-consumer.sh时,我也会收到消息。有人知道我会做错什么吗?此代码由get api调用。

  1. public List<KafkaTextMessage> receiveMessages() {
  2. log.info("Retrieving messages from kafka");
  3. val props = new Properties();
  4. // See https://kafka.apache.org/documentation/#consumerconfigs
  5. props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
  6. //props.put("client.id", "my-topic consumer");
  7. props.put("group.id", "test");
  8. props.put("enable.auto.commit", "false");
  9. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  10. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11. ImmutableList.Builder<KafkaTextMessage> builder;
  12. try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
  13. consumer.subscribe(Collections.singletonList(TEXT_MESSAGE_TOPIC));
  14. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  15. builder = ImmutableList.builder();
  16. for (ConsumerRecord<String, String> record : records) {
  17. builder.add(new KafkaTextMessage(record.value()));
  18. log.info("We got at position: {} key:{} value: {}", record.offset(), record.key(), record.value());
  19. consumer.commitSync();
  20. }
  21. }
  22. return builder.build();
  23. }
mgdq6dx1

mgdq6dx11#

尝试添加 auto.offset.reset=earliest 你的消费者财产。默认值设置为 latest . 我这么说是因为我看到你的 group.id 设置为 test ,可能已在以前的测试中使用的值。

相关问题