同一偏移量和分区记录被消耗了两次,导致重复

dauxcl2d  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(262)

我试图使用SpringKafka编写的应用程序来消费记录。我面临着非常独特的情况,无法理解为什么会发生这种情况?
我的使用者应用程序是以2个并发运行的,这意味着订阅主题的2个使用者线程有两个分区。我使用upsert with offset、partitions和insert timestamp消费记录并将其放入表中。
我在表中看到具有相同偏移量和分区的重复值,这是不应该发生的。时间戳值没有差异,意味着插入同时发生。我不知道怎么可能?我在日志中也没有看到任何问题。我不确定在生产者端发生了什么,但无论如何我们不能在同一偏移处有两个值,所以不确定这是否是生产者端的消费者端的问题。有什么建议或想法可以帮助我对这个问题进行分类吗?

Kafka日志:
我在日志中也没有发现任何异常。

  1. 14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
  2. 14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
  3. 14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1604154596318
  4. 14:29:56.319 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Subscribed to topic(s): kaas.pe.enrollment.csp.ts2
  5. 14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
  6. 14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
  7. 14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
  8. 14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
  9. 14:29:57.923 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
  10. 14:29:57.924 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
  11. 14:29:58.121 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
  12. 14:29:58.125 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
  13. 14:30:13.127 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Finished assignment for group at generation 23: {consumer-csp-prov-emerald-test-1-19d92ba5-5dc3-433d-b967-3cf1ce1b4174=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@d7e2a1f, consumer-csp-prov-emerald-test-2-5833c212-7031-4ab1-944b-7e26f7d7a293=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@53c3aad3}
  14. 14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
  15. 14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
  16. 14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
  17. 14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
  18. 14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500387, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
  19. 14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499503, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}

代码:

  1. @KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
  2. public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
  3. try {
  4. prov_tin_number = record.value().get("providerTinNumber").toString();
  5. prov_tin_type = record.value().get("providerTINType").toString();
  6. enroll_type = record.value().get("enrollmentType").toString();
  7. vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
  8. error_flag = "";
  9. dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
  10. record.partition(), record.offset());
  11. acknowledgement.acknowledge();
  12. }catch (Exception ex) {
  13. System.out.println(record);
  14. System.out.println(ex.getMessage());
  15. }
  16. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题