我有1分钟的时间来处理一些通过我的一个主题来进行的事件。我希望采取的结果,该窗口,并转换它一次以上,以获得我的最终结果,我想推到一个Flume。这些是我的dsl调用,以完成我的聚合。
val sessionProcessorStream = builder.stream("collector-prod", Consumed.`with`(Serdes.String, Serdes.String)) //[String,String]
.filter((_, value) => filterRequest(value))
.transform(valTransformer,"valTransformState") //[String,String]
.groupByKey()
.windowedBy(SessionWindows.`with`(TimeUnit.MINUTES.toMillis(1))) //[Windowed[String],String]
.aggregate(sessionInitializer,sessionAggregator,sessionMerger,sessionMaterialized) //[Windowed[String],SessionEvent]
.toStream
.transform(sessionTransformer,"sessionTransformState") // [String,Long]
当我尝试在同一个窗口中运行此程序并处理两个事件时,出现以下错误:
Exception in thread "9-fc11122f-0db3-401b-bdaa-2480eacb8e74-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store session-agg-store7
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.NullPointerException
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:857)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3079)
at SessionEventDeserializer.deserialize(SessionEventDeserializer.scala:17)
at SessionEventDeserializer.deserialize(SessionEventDeserializer.scala:8)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:158)
at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:173)
at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
... 14 more
我不确定异常的确切原因,但是,我确实认为有一个问题,那就是进入最后一次转换的流的密钥是 Windowed[String]
. 我认为这是正常的 String
.
我试着通过运行一个 .map
在tostream之后
.map { (_,v) => {
new KeyValue[String,SessionEvent](v.name,v)
}}
然而,我得到了一个编译时错误,我无法找出。
Error:(106, 8) no type parameters for method map: (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]])org.apache.kafka.streams.kstream.KStream[KR,VR] exist so that it can be applied to arguments (org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]])
--- because ---
argument expression's type is not compatible with formal parameter type;
found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]]
required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: ?KR, _ <: ?VR]]
.map { (_,v) => {
Error:(106, 20) type mismatch;
found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],SessionEvent,org.apache.kafka.streams.KeyValue[String,SessionEvent]]
required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: SessionEvent, _ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]
.map { (_,v) => {
转换窗口聚合结果的正确方法是什么?我将感谢任何帮助,并将很高兴进一步澄清任何问题。
1条答案
按热度按时间9avjhtql1#
错误源于“flush”,因此我怀疑是聚合后的转换器,而是聚合本身。我想你身上有个虫子
SessionEventDeserializer
,因为它抛出NullPointerException
--你能处理吗null
正确地?反序列化程序必须能够处理null
然后回来null
为了这个案子。