kafka&spring批处理-如何只读取来自同一主题的未提交消息?

tuwxkamq  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(698)

我正在使用spring批处理和kafka处理一个小批处理,它从kafka主题读取json数据,将其转换为student对象,更改值并将其发送回kafka主题。一切都很好,但我唯一的问题是,我的消费者总是从乞讨的主题阅读。我需要它来读最后一条未被消费的信息。我已经添加了这些属性:

  1. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
  2. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
  3. ConsumerConfig.GROUP_ID_CONFIG to a random value

但这似乎不起作用,在消费者启动时,它处理所有的信息。有人知道怎么用Spring批和Kafka吗?这是我的密码:
batchstudent.java文件:

  1. @SpringBootApplication
  2. @EnableBatchProcessing
  3. @RequiredArgsConstructor
  4. public class BatchStudent {
  5. public static void main(String[] args) {
  6. SpringApplication.run(BatchStudent.class, args);
  7. }
  8. private final JobBuilderFactory jobBuilderFactory;
  9. private final StepBuilderFactory stepBuilderFactory;
  10. private final KafkaTemplate<Integer, Student> template;
  11. private final KafkaProperties properties;
  12. @Value("${kafka.topic.consumer}")
  13. private String topic;
  14. @Bean
  15. public ItemProcessor<Student, Student> customItemProcessor() {
  16. return new CustomProcessor();
  17. }
  18. @Bean
  19. Job job() {
  20. return this.jobBuilderFactory.get("job")
  21. .start(start())
  22. .incrementer(new RunIdIncrementer())
  23. .build();
  24. }
  25. @Bean
  26. KafkaItemWriter<Integer, Student> writer() {
  27. return new KafkaItemWriterBuilder<Integer, Student>()
  28. .kafkaTemplate(template)
  29. .itemKeyMapper(Student::getId)
  30. .build();
  31. }
  32. @Bean
  33. public KafkaItemReader<Integer, Student> reader() {
  34. Properties props = new Properties();
  35. props.putAll(this.properties.buildConsumerProperties());
  36. return new KafkaItemReaderBuilder<Integer, Student>()
  37. .partitions(0)
  38. .consumerProperties(props)
  39. .name("students-consumer-reader")
  40. .saveState(true)
  41. .topic(topic)
  42. .build();
  43. }
  44. @Bean
  45. Step start() {
  46. return this.stepBuilderFactory
  47. .get("step")
  48. .<Student, Student>chunk(10)
  49. .writer(writer())
  50. .processor(customItemProcessor())
  51. .reader(reader())
  52. .build();
  53. }
  54. }

应用程序yml

  1. spring.batch.initialize-schema: always
  2. # Conf Kafka Consumer
  3. spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
  4. spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  5. # spring.kafka.consumer.group-id: student-group
  6. spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
  7. spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student
  8. # Conf Kafka Producer
  9. spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
  10. spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  11. spring.kafka.producer.bootstrap-servers: localhost:9092
  12. # Conf topics
  13. spring.kafka.template.default-topic: producer.student
  14. kafka.topic.consumer: consumer.student

学生.java

  1. @Data
  2. @NoArgsConstructor
  3. @AllArgsConstructor
  4. public class Student {
  5. Integer id;
  6. Integer count;
  7. }

自定义处理器.java

  1. @NoArgsConstructor
  2. public class CustomProcessor implements ItemProcessor<Student, Student> {
  3. @Override
  4. public Student process(Student studentRecieved) {
  5. final Student studentSent = new Student();
  6. studentSent.setId(studentRecieved.getId());
  7. studentSent.setCount(200);
  8. return studentSent;
  9. }
  10. }

谢谢你的帮助!

ldioqlga

ldioqlga1#

一切都很好,但我唯一的问题是,我的消费者总是从乞讨的主题阅读。我需要它来读最后一条未被消费的信息。
springbatch4.3引入了一种使用kafka中存储的偏移量记录的方法。我在去年的springone演讲中谈到了这个特性:springbatch4.3有什么新特性?。您可以使用setPartitionOffset在每个分区中为kafka读取器配置一个自定义的起始偏移量:

  1. Setter for partition offsets. This mapping tells the reader the offset to start reading
  2. from in each partition. This is optional, defaults to starting from offset 0 in each
  3. partition. Passing an empty map makes the reader start from the offset stored in Kafka
  4. for the consumer group ID.

您可以在这个测试用例中找到一个完整的示例。

相关问题