kafka消费者补偿提交在completable future中

nwsw7zdq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(885)

我在线程中创建了一个kafka consumer示例作为构造函数的一部分,在thread inside run方法中调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenapply方法并传递kafka consumer示例来发出commit,因为这会给我一个错误,即kafka consumer不是线程安全的。虽然在我的commit方法中我已经编写了代码

synchronized(consumer) {
  commitResponse();
}

我还是明白 ConcurrentModificationException .

class KafkaConsumerThread implements Runnable {

  KafkaConsumer<String, String> consumer;

  public KafkaConsumerThread(Properties properties) {
    consumer = new KafkaConsumer<String, String>(properties);    
    ...
  }

  @Override
  public void run() {
    try {
      // synchronized (consumer) {
      consumer.subscribe(topics);
      while (true) {
        if (closed.get()) {
          consumer.close();
        }
        ConsumerRecords<String, String> records = consumer.poll(120000);
        for (ConsumerRecord<String, String> record : records) {
          getAsyncClient().prepareGet(webServiceUrl)
              .execute()
              .toCompletableFuture()
              .thenApply(resp -> callAnotherService1(resp))
              .thenApply(resp -> callAnotherService2(resp))
              .thenApply(resp -> commitResponse(resp, consumer));
          }
        }
      }
    } catch (Exception ex) {
      ...
    }

在上面的代码中,我在commitresponse方法中得到一个异常,即“kafkaconsumer对于多线程访问不安全”。虽然在提交响应中,如果将提交包含在synchronized(consumer)中,我仍然会得到错误。

qlzsbp2j

qlzsbp2j1#

很可能是因为 poll 方法是不同步的,并且在异步get执行提交时执行(仍然保持内部kafka锁运行)。
请参见对私有方法的引用: org.apache.kafka.clients.consumer.KafkaConsumer.acquire() 以及 org.apache.kafka.clients.consumer.KafkaConsumer.release()org.apache.kafka.clients.consumer.KafkaConsumer

相关问题