我正在尝试使用scheduledexecutorservice计划kafkaconsumer.poll()。
轮询():
订阅topic,初始化调度程序,在每次执行完成后以1秒的延迟运行第一个lambda。停止计划任务有两个选项:
第二个lambda(可调用对象)变为true—直到每次执行时存储到hashmap中的轮询记录都包含带有我要查找的键的消息。
计划达到1分钟的超时。
public void poll(String topic, String key) {
log.info("Polling...");
consumer.subscribe(Collections.singleton(topic));
Scheduler scheduler = new Scheduler(1000);
scheduler.fixedDelaySchedule(
() -> {
ConsumerRecords<String, E> records = consumer.poll(Duration.ofMillis(4000));
records.forEach(record -> cache.put(record.key(), record));
log.info(cache.toString());
consumer.commitSync();
},
() -> cache.containsKey(key));
}
scheduler.fixeddelayschedule():
private static final BiFunction<ScheduledExecutorService, Callable<Boolean>, Runnable> cancelFunction =
(service, condition) -> {
return new Runnable() {
@Override
@SneakyThrows
public void run() {
if (condition.call()) {
service.shutdown();
}
}
};
};
@SneakyThrows
public void fixedDelaySchedule(Runnable function, Callable<Boolean> stopCondition) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(0);
Runnable canceller = cancelFunction.apply(scheduler, stopCondition);
scheduler.scheduleWithFixedDelay(function, 0, interval, MILLISECONDS);
scheduler.scheduleWithFixedDelay(canceller, 0, interval, MILLISECONDS);
scheduler.awaitTermination(60, SECONDS);
}
因此,我在1分钟内没有收到来自Kafka的任何消息:
22-04-2021 15:32:45.750 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:32:47.301 [pool-13-thread-1] INFO org.apache.kafka.clients.Metadata.update - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Cluster ID: nBDatdysQ2KAKeiwGNEinA
22-04-2021 15:32:47.305 [pool-13-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator.onSuccess - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Discovered group coordinator brook-c.nonprod.us-west-2.aws.proton.nordstrom.com:9094 (id: 2147483641 rack: null)
22-04-2021 15:32:47.318 [pool-13-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] (Re-)joining group 22-04-2021 15:32:49.753 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:32:50.754 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:32:51.858 [pool-13-thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator.performAssignment - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Finished assignment for group at generation 1: {ESO_retry_test_instance_id_0423764828-4482ea43-5f87-48a4-8ef4-1ad7483d5fee=Assignment(partitions=[merch_inv_retry_orch_avro_test-0, merch_inv_retry_orch_avro_test-1])}
22-04-2021 15:32:52.030 [pool-13-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator.onSuccess - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Successfully joined group with generation 1
22-04-2021 15:32:52.047 [pool-13-thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator.invokePartitionsAssigned - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Adding newly assigned partitions: merch_inv_retry_orch_avro_test-0, merch_inv_retry_orch_avro_test-1
22-04-2021 15:32:52.233 [pool-13-thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator.handle - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Found no committed offset for partition merch_inv_retry_orch_avro_test-0
22-04-2021 15:32:52.233 [pool-13-thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator.handle - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Found no committed offset for partition merch_inv_retry_orch_avro_test-1
22-04-2021 15:32:53.934 [pool-13-thread-1] INFO o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Resetting offset for partition merch_inv_retry_orch_avro_test-1 to offset 818824. 22-04-2021 15:32:53.941 [pool-13-thread-1] INFO o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Resetting offset for partition merch_inv_retry_orch_avro_test-0 to offset 962365. 22-04-2021 15:32:54.754 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:32:55.929 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:32:59.929 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:01.106 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:05.107 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:06.280 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:10.281 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:11.457 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:15.457 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:16.632 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:20.633 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:21.834 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:25.834 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:27.007 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:31.007 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:32.185 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:36.186 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:37.390 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
22-04-2021 15:33:41.391 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {}
22-04-2021 15:33:42.565 [pool-13-thread-1] INFO c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...
然后由我处理异常:
java.util.NoSuchElementException: Topic message with key [c17b6b3c-d84b-47a2-857d-ba836be96732-[0]-INVENTORY_ADJUSTMENT] wasn't found
暂无答案!
目前还没有任何答案,快来回答吧!