我试图删除压缩主题中的一些记录(tombstone),方法是发出一个(“key”,null)键值对,压缩一段时间后就会删除这些记录。
但是,我得到以下例外:
Exception in thread "SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1" java.lang.NullPointerException
at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537)
at com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360)
at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24)
at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23)
at com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
你知道吗?
谢谢你,尼古拉
暂无答案!
目前还没有任何答案,快来回答吧!