spring boot kafka消息消费者和丢失消息

pepwfjgg  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(524)

我使用SpringBoot2.0.2和SpringKafka。我还使用了以下存储库中的kafka docker image 1.1.0:https://hub.docker.com/r/wurstmeister/kafka/tags/
这是我的Kafka配置:

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. }
  5. @Configuration
  6. public class KafkaConsumerConfig {
  7. @Value("${spring.kafka.bootstrap-servers}")
  8. private String bootstrapServers;
  9. @Value("${spring.kafka.consumer.group-id}")
  10. private String consumerGroupId;
  11. @Bean
  12. public Map<String, Object> consumerConfigs() {
  13. Map<String, Object> props = new HashMap<>();
  14. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  15. props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
  16. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  17. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  18. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));
  19. return props;
  20. }
  21. @Bean
  22. public ConsumerFactory<String, String> consumerFactory() {
  23. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
  24. }
  25. @Bean
  26. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  27. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  28. factory.setConsumerFactory(consumerFactory());
  29. return factory;
  30. }
  31. @Bean
  32. public ConsumerFactory<String, Post> postConsumerFactory() {
  33. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
  34. }
  35. @Bean
  36. public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {
  37. ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
  38. factory.setConsumerFactory(postConsumerFactory());
  39. return factory;
  40. }
  41. }
  42. @Configuration
  43. public class KafkaProducerConfig {
  44. @Value("${spring.kafka.bootstrap-servers}")
  45. private String bootstrapServers;
  46. @Bean
  47. public Map<String, Object> producerConfigs() {
  48. Map<String, Object> props = new HashMap<>();
  49. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  50. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  51. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  52. props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
  53. return props;
  54. }
  55. @Bean
  56. public ProducerFactory<String, Post> postProducerFactory() {
  57. return new DefaultKafkaProducerFactory<>(producerConfigs());
  58. }
  59. @Bean
  60. public KafkaTemplate<String, Post> postKafkaTemplate() {
  61. return new KafkaTemplate<>(postProducerFactory());
  62. }
  63. }

这是Kafka application.properties :

  1. # Kafka
  2. spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
  3. spring.kafka.consumer.auto-offset-reset=earliest
  4. spring.kafka.consumer.group-id=postfenix
  5. kafka.topic.posts.create=posts.create

这是我的消息侦听器:

  1. @Component
  2. public class PostConsumer {
  3. static final Logger logger = LoggerFactory.getLogger(PostConsumer.class);
  4. @KafkaListener(topics = "${kafka.topic.posts.create}", containerFactory = "postKafkaListenerContainerFactory")
  5. public void createPost(ConsumerRecord<String, Post> consumerRecord) {
  6. Post post = consumerRecord.value();
  7. logger.info("Received message for post creation: {}", post);
  8. }
  9. }

我还实施了 PostService 哪个应该发送 Post Kafka主题:

  1. @Service
  2. public class PostServiceImpl implements PostService {
  3. static final Logger logger = LoggerFactory.getLogger(PostServiceImpl.class);
  4. @Value("${kafka.topic.posts.create}")
  5. private String kafkaTopicPostsCreate;
  6. @Autowired
  7. private KafkaTemplate<String, Post> postKafkaTemplate;
  8. @Override
  9. public void sendPost(Post post) {
  10. postKafkaTemplate.send(kafkaTopicPostsCreate, post);
  11. logger.info("Message sent to the post creation queue: {}", post);
  12. }
  13. }

我还实现了springboot测试:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = { TestApplication.class })
  3. public class PostServiceIT {
  4. @Autowired
  5. private PostService postService;
  6. @Autowired
  7. private MessageRepository messageRepository;
  8. @Before
  9. public void setUp() {
  10. messageRepository.deleteAll();
  11. }
  12. @Test
  13. public void testCreatePost() throws InterruptedException {
  14. assertEquals(0, messageRepository.findAll().size());
  15. Post post = new Post();
  16. ...
  17. postService.sendPost(post);
  18. await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);
  19. }
  20. }

这是日志:

  1. 2018-06-09 16:12:37.547 INFO 17824 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler 'schedulerFactoryBean' initialized from an externally provided properties instance.
  2. 2018-06-09 16:12:37.547 INFO 17824 --- [ main] org.quartz.impl.StdSchedulerFactory : Quartz scheduler version: 2.3.0
  3. 2018-06-09 16:12:37.548 INFO 17824 --- [ main] org.quartz.core.QuartzScheduler : JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@7a3e5cd3
  4. 2018-06-09 16:12:38.967 INFO 17824 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483547
  5. 2018-06-09 16:12:38.997 INFO 17824 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
  6. auto.commit.interval.ms = 5000
  7. auto.offset.reset = latest
  8. bootstrap.servers = [127.0.0.1:9093]
  9. check.crcs = true
  10. client.id =
  11. connections.max.idle.ms = 540000
  12. enable.auto.commit = true
  13. exclude.internal.topics = true
  14. fetch.max.bytes = 52428800
  15. fetch.max.wait.ms = 500
  16. fetch.min.bytes = 1
  17. group.id = postfenix
  18. heartbeat.interval.ms = 3000
  19. interceptor.classes = []
  20. internal.leave.group.on.close = true
  21. isolation.level = read_uncommitted
  22. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  23. max.partition.fetch.bytes = 1048576
  24. max.poll.interval.ms = 600000
  25. max.poll.records = 500
  26. metadata.max.age.ms = 300000
  27. metric.reporters = []
  28. metrics.num.samples = 2
  29. metrics.recording.level = INFO
  30. metrics.sample.window.ms = 30000
  31. partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  32. receive.buffer.bytes = 65536
  33. reconnect.backoff.max.ms = 1000
  34. reconnect.backoff.ms = 50
  35. request.timeout.ms = 305000
  36. retry.backoff.ms = 100
  37. sasl.jaas.config = null
  38. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  39. sasl.kerberos.min.time.before.relogin = 60000
  40. sasl.kerberos.service.name = null
  41. sasl.kerberos.ticket.renew.jitter = 0.05
  42. sasl.kerberos.ticket.renew.window.factor = 0.8
  43. sasl.mechanism = GSSAPI
  44. security.protocol = PLAINTEXT
  45. send.buffer.bytes = 131072
  46. session.timeout.ms = 10000
  47. ssl.cipher.suites = null
  48. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  49. ssl.endpoint.identification.algorithm = null
  50. ssl.key.password = null
  51. ssl.keymanager.algorithm = SunX509
  52. ssl.keystore.location = null
  53. ssl.keystore.password = null
  54. ssl.keystore.type = JKS
  55. ssl.protocol = TLS
  56. ssl.provider = null
  57. ssl.secure.random.implementation = null
  58. ssl.trustmanager.algorithm = PKIX
  59. ssl.truststore.location = null
  60. ssl.truststore.password = null
  61. ssl.truststore.type = JKS
  62. value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
  63. 2018-06-09 16:12:39.095 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.1.0
  64. 2018-06-09 16:12:39.095 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fdcf75ea326b8e07
  65. 2018-06-09 16:12:39.100 INFO 17824 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
  66. 2018-06-09 16:12:39.104 INFO 17824 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
  67. 2018-06-09 16:12:39.104 INFO 17824 --- [ main] o.s.s.quartz.SchedulerFactoryBean : Starting Quartz Scheduler now
  68. 2018-06-09 16:12:39.104 INFO 17824 --- [ main] org.quartz.core.QuartzScheduler : Scheduler schedulerFactoryBean_$_NON_CLUSTERED started.
  69. 2018-06-09 16:12:39.111 INFO 17824 --- [SchedulerThread] c.n.quartz.mongodb.dao.TriggerDao : Found 0 triggers which are eligible to be run.
  70. 2018-06-09 16:12:39.119 INFO 17824 --- [ main] com.postfenix.domain.post.PostServiceIT : Started PostServiceIT in 5.094 seconds (JVM running for 5.74)
  71. 2018-06-09 16:12:39.121 INFO 17824 --- [ main] c.p.d.configuration.TestApplication : Initializing application...
  72. 2018-06-09 16:12:39.258 INFO 17824 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27018
  73. 2018-06-09 16:12:39.338 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 2 : {posts.create=LEADER_NOT_AVAILABLE}
  74. 2018-06-09 16:12:39.339 INFO 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
  75. 2018-06-09 16:12:39.392 INFO 17824 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
  76. acks = 1
  77. batch.size = 16384
  78. bootstrap.servers = [127.0.0.1:9093]
  79. buffer.memory = 33554432
  80. client.id =
  81. compression.type = none
  82. connections.max.idle.ms = 540000
  83. enable.idempotence = false
  84. interceptor.classes = []
  85. key.serializer = class org.apache.kafka.common.serialization.StringSerializer
  86. linger.ms = 0
  87. max.block.ms = 60000
  88. max.in.flight.requests.per.connection = 5
  89. max.request.size = 15000000
  90. metadata.max.age.ms = 300000
  91. metric.reporters = []
  92. metrics.num.samples = 2
  93. metrics.recording.level = INFO
  94. metrics.sample.window.ms = 30000
  95. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  96. receive.buffer.bytes = 32768
  97. reconnect.backoff.max.ms = 1000
  98. reconnect.backoff.ms = 50
  99. request.timeout.ms = 30000
  100. retries = 0
  101. retry.backoff.ms = 100
  102. sasl.jaas.config = null
  103. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  104. sasl.kerberos.min.time.before.relogin = 60000
  105. sasl.kerberos.service.name = null
  106. sasl.kerberos.ticket.renew.jitter = 0.05
  107. sasl.kerberos.ticket.renew.window.factor = 0.8
  108. sasl.mechanism = GSSAPI
  109. security.protocol = PLAINTEXT
  110. send.buffer.bytes = 131072
  111. ssl.cipher.suites = null
  112. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  113. ssl.endpoint.identification.algorithm = null
  114. ssl.key.password = null
  115. ssl.keymanager.algorithm = SunX509
  116. ssl.keystore.location = null
  117. ssl.keystore.password = null
  118. ssl.keystore.type = JKS
  119. ssl.protocol = TLS
  120. ssl.provider = null
  121. ssl.secure.random.implementation = null
  122. ssl.trustmanager.algorithm = PKIX
  123. ssl.truststore.location = null
  124. ssl.truststore.password = null
  125. ssl.truststore.type = JKS
  126. transaction.timeout.ms = 60000
  127. transactional.id = null
  128. value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
  129. 2018-06-09 16:12:39.419 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.1.0
  130. 2018-06-09 16:12:39.419 INFO 17824 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fdcf75ea326b8e07
  131. 2018-06-09 16:12:39.437 WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {posts.create=LEADER_NOT_AVAILABLE}
  132. 2018-06-09 16:12:39.437 INFO 17824 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
  133. 2018-06-09 16:12:39.454 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 4 : {posts.create=LEADER_NOT_AVAILABLE}
  134. 2018-06-09 16:12:39.565 WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {posts.create=LEADER_NOT_AVAILABLE}
  135. 2018-06-09 16:12:39.590 WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 6 : {posts.create=LEADER_NOT_AVAILABLE}
  136. 2018-06-09 16:12:39.704 INFO 17824 --- [ main] c.p.domain.service.post.PostServiceImpl : Message sent to the post creation queue: Post [chatId=@name, parseMode=HTML]
  137. 2018-06-09 16:12:40.229 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Discovered group coordinator 10.0.75.1:9093 (id: 2147482646 rack: null)
  138. 2018-06-09 16:12:40.232 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Revoking previously assigned partitions []
  139. 2018-06-09 16:12:40.233 INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
  140. 2018-06-09 16:12:40.233 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] (Re-)joining group
  141. 2018-06-09 16:12:40.295 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Successfully joined group with generation 1
  142. 2018-06-09 16:12:40.297 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=postfenix] Setting newly assigned partitions [posts.create-0]
  143. 2018-06-09 16:12:40.313 INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-1, groupId=postfenix] Resetting offset for partition posts.create-0 to offset 1.
  144. 2018-06-09 16:12:40.315 INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [posts.create-0]

现在我的测试在以下几行失败了:

  1. await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);

因为在第一次测试运行之后,由于某种原因消息没有传递到 PostConsumer.createPost 方法。但是,如果我在同一个kafka docker示例上再次运行相同的测试,那么来自上一次测试运行的消息将成功地传递到 PostConsumer.createPost . 我做错了什么?为什么在第一次测试运行后没有传递消息?如何修复它?
更新
这是我的最新消息 KafkaConsumerConfig :

  1. @Configuration
  2. public class KafkaConsumerConfig {
  3. @Bean
  4. public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
  5. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
  6. }
  7. @Bean
  8. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  9. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  10. factory.setConsumerFactory(consumerFactory());
  11. return factory;
  12. }
  13. @Bean
  14. public ConsumerFactory<String, Post> postConsumerFactory() {
  15. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
  16. }
  17. @Bean
  18. public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {
  19. ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
  20. factory.setConsumerFactory(postConsumerFactory());
  21. return factory;
  22. }
  23. }

现在我有两个编译错误 kafkaListenerContainerFactory 以及 postConsumerFactory 方法因为 consumerConfigs() 方法不存在且 consumerFactory 中的方法 kafkaListenerContainerFactory 要求 KafkaProperties .

ig9co6j1

ig9co6j11#

spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.group-id=postfenix 您没有使用这些引导属性,因为您正在创建自己的使用者配置。
你应该换掉这个

  1. @Value("${spring.kafka.bootstrap-servers}")
  2. private String bootstrapServers;
  3. @Value("${spring.kafka.consumer.group-id}")
  4. private String consumerGroupId;
  5. @Bean
  6. public Map<String, Object> consumerConfigs() {
  7. Map<String, Object> props = new HashMap<>();
  8. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  9. props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
  10. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
  12. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));
  13. return props;
  14. }
  15. @Bean
  16. public ConsumerFactory<String, String> consumerFactory() {
  17. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
  18. }

具有

  1. @Bean
  2. public ConsumerFactory<String, String> consumerFactory(
  3. KafkaProperties kafkaProperties) {
  4. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(),
  5. new StringDeserializer(), new JsonDeserializer<>(String.class));
  6. }

编辑

  1. @Bean
  2. public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
  3. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
  4. }
  5. @Bean
  6. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
  7. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  8. factory.setConsumerFactory(consumerFactory(kafkaProperties));
  9. return factory;
  10. }
  11. @Bean
  12. public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
  13. return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
  14. }
  15. @Bean
  16. public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
  17. ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
  18. factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
  19. return factory;
  20. }
展开查看全部

相关问题