我试图使用SpringKafka编写的应用程序来消费记录。我面临着非常独特的情况,无法理解为什么会发生这种情况?
我的使用者应用程序是以2个并发运行的,这意味着订阅主题的2个使用者线程有两个分区。我使用upsert with offset、partitions和insert timestamp消费记录并将其放入表中。
我在表中看到具有相同偏移量和分区的重复值,这是不应该发生的。时间戳值没有差异,意味着插入同时发生。我不知道怎么可能?我在日志中也没有看到任何问题。我不确定在生产者端发生了什么,但无论如何我们不能在同一偏移处有两个值,所以不确定这是否是生产者端的消费者端的问题。有什么建议或想法可以帮助我对这个问题进行分类吗?
Kafka日志:
我在日志中也没有发现任何异常。
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1604154596318
14:29:56.319 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Subscribed to topic(s): kaas.pe.enrollment.csp.ts2
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.923 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:57.924 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.121 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.125 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:30:13.127 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Finished assignment for group at generation 23: {consumer-csp-prov-emerald-test-1-19d92ba5-5dc3-433d-b967-3cf1ce1b4174=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@d7e2a1f, consumer-csp-prov-emerald-test-2-5833c212-7031-4ab1-944b-7e26f7d7a293=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@53c3aad3}
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500387, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499503, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
代码:
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
prov_tin_number = record.value().get("providerTinNumber").toString();
prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
record.partition(), record.offset());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
暂无答案!
目前还没有任何答案,快来回答吧!