kafkaconsumer和scheduledexecutorservice不工作

jmp7cifd  于 2021-07-09  发布在  Java
关注(0)|答案(0)|浏览(417)

我正在尝试使用scheduledexecutorservice计划kafkaconsumer.poll()。
轮询():
订阅topic,初始化调度程序,在每次执行完成后以1秒的延迟运行第一个lambda。停止计划任务有两个选项:
第二个lambda(可调用对象)变为true—直到每次执行时存储到hashmap中的轮询记录都包含带有我要查找的键的消息。
计划达到1分钟的超时。

public void poll(String topic, String key) {
    log.info("Polling...");
    consumer.subscribe(Collections.singleton(topic));
    Scheduler scheduler = new Scheduler(1000);
    scheduler.fixedDelaySchedule(
        () -> {
            ConsumerRecords<String, E> records = consumer.poll(Duration.ofMillis(4000));
            records.forEach(record -> cache.put(record.key(), record));
            log.info(cache.toString());
            consumer.commitSync();
        },
        () -> cache.containsKey(key));
}

scheduler.fixeddelayschedule():

private static final BiFunction<ScheduledExecutorService, Callable<Boolean>, Runnable> cancelFunction =
          (service, condition) -> {
            return new Runnable() {
              @Override
              @SneakyThrows
              public void run() {
                if (condition.call()) {
                  service.shutdown();
                }
              }
            };
          };

@SneakyThrows
public void fixedDelaySchedule(Runnable function, Callable<Boolean> stopCondition) {
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(0);
    Runnable canceller = cancelFunction.apply(scheduler, stopCondition);
    scheduler.scheduleWithFixedDelay(function, 0, interval, MILLISECONDS);
    scheduler.scheduleWithFixedDelay(canceller, 0, interval, MILLISECONDS);
    scheduler.awaitTermination(60, SECONDS);
}

因此,我在1分钟内没有收到来自Kafka的任何消息:

22-04-2021 15:32:45.750 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:32:47.301 [pool-13-thread-1] INFO  org.apache.kafka.clients.Metadata.update - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Cluster ID: nBDatdysQ2KAKeiwGNEinA 
22-04-2021 15:32:47.305 [pool-13-thread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.onSuccess - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Discovered group coordinator brook-c.nonprod.us-west-2.aws.proton.nordstrom.com:9094 (id: 2147483641 rack: null) 
22-04-2021 15:32:47.318 [pool-13-thread-1] INFO o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] (Re-)joining group 22-04-2021 15:32:49.753 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:32:50.754 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:32:51.858 [pool-13-thread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.performAssignment - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Finished assignment for group at generation 1: {ESO_retry_test_instance_id_0423764828-4482ea43-5f87-48a4-8ef4-1ad7483d5fee=Assignment(partitions=[merch_inv_retry_orch_avro_test-0, merch_inv_retry_orch_avro_test-1])} 
22-04-2021 15:32:52.030 [pool-13-thread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.onSuccess - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Successfully joined group with generation 1 
22-04-2021 15:32:52.047 [pool-13-thread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.invokePartitionsAssigned - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Adding newly assigned partitions: merch_inv_retry_orch_avro_test-0, merch_inv_retry_orch_avro_test-1 
22-04-2021 15:32:52.233 [pool-13-thread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.handle - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Found no committed offset for partition merch_inv_retry_orch_avro_test-0 
22-04-2021 15:32:52.233 [pool-13-thread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.handle - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Found no committed offset for partition merch_inv_retry_orch_avro_test-1 
22-04-2021 15:32:53.934 [pool-13-thread-1] INFO  o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Resetting offset for partition merch_inv_retry_orch_avro_test-1 to offset 818824. 22-04-2021 15:32:53.941 [pool-13-thread-1] INFO  o.a.k.c.c.i.SubscriptionState.maybeSeekUnvalidated - [Consumer instanceId=ESO_retry_test_instance_id_0423764828, clientId=consumer-ies-slc_7691520826-ESO_retry_test_instance_id_0423764828, groupId=ies-slc_7691520826] Resetting offset for partition merch_inv_retry_orch_avro_test-0 to offset 962365. 22-04-2021 15:32:54.754 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:32:55.929 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:32:59.929 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:01.106 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:05.107 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:06.280 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:10.281 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:11.457 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:15.457 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:16.632 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:20.633 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:21.834 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:25.834 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:27.007 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:31.007 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:32.185 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:36.186 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:37.390 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling... 
22-04-2021 15:33:41.391 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - {} 
22-04-2021 15:33:42.565 [pool-13-thread-1] INFO  c.n.k.c.impl.KafkaConsumerClient.lambda$poll$1 - Polling...

然后由我处理异常:

java.util.NoSuchElementException: Topic message with key [c17b6b3c-d84b-47a2-857d-ba836be96732-[0]-INVENTORY_ADJUSTMENT] wasn't found

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题