我有两个java代码来使用来自kafka的消息。
使用高级使用者api
camel-kafka'from(endpoint)'语法
当我在camel kafka中禁用autocommit时 java.util.concurrent.BrokenBarrierException
,为什么它在获取下一批之前强制提交使用者偏移量?因为它不会发生在高级api实现中。
异常跟踪是:
ERROR - KafkaConsumer -
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:200)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
at org.apache.camel.component.kafka.KafkaConsumer$BatchingConsumerTask.run(KafkaConsumer.java:165)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我只想读取来自kafka代理的所有消息,而不提交偏移量。是否有任何配置参数需要传入终结点?
暂无答案!
目前还没有任何答案,快来回答吧!