我在线程中创建了一个kafka consumer示例作为构造函数的一部分,在thread inside run方法中调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenapply方法并传递kafka consumer示例来发出commit,因为这会给我一个错误,即kafka consumer不是线程安全的。虽然在我的commit方法中我已经编写了代码
synchronized(consumer) {
commitResponse();
}
我还是明白 ConcurrentModificationException
.
class KafkaConsumerThread implements Runnable {
KafkaConsumer<String, String> consumer;
public KafkaConsumerThread(Properties properties) {
consumer = new KafkaConsumer<String, String>(properties);
...
}
@Override
public void run() {
try {
// synchronized (consumer) {
consumer.subscribe(topics);
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(120000);
for (ConsumerRecord<String, String> record : records) {
getAsyncClient().prepareGet(webServiceUrl)
.execute()
.toCompletableFuture()
.thenApply(resp -> callAnotherService1(resp))
.thenApply(resp -> callAnotherService2(resp))
.thenApply(resp -> commitResponse(resp, consumer));
}
}
}
} catch (Exception ex) {
...
}
在上面的代码中,我在commitresponse方法中得到一个异常,即“kafkaconsumer对于多线程访问不安全”。虽然在提交响应中,如果将提交包含在synchronized(consumer)中,我仍然会得到错误。
1条答案
按热度按时间qlzsbp2j1#
很可能是因为
poll
方法是不同步的,并且在异步get执行提交时执行(仍然保持内部kafka锁运行)。请参见对私有方法的引用:
org.apache.kafka.clients.consumer.KafkaConsumer.acquire()
以及org.apache.kafka.clients.consumer.KafkaConsumer.release()
在org.apache.kafka.clients.consumer.KafkaConsumer