在研究斯卡拉·Kafka的时候 KeyValueMapper
实现我得到以下错误。我不知道到底有什么区别。谢谢你的帮助。
代码:
我创造了一个 KTable
从主题开始。
val creducer: Reducer[java.lang.Long] =
(v1, v2) => if (v1 > v2) v1 else v2
val deduplicationWindow = TimeWindows
.of(60000L * 10)
.advanceBy(60000L)
.until(60000L * 10)
val ktwindow: KTable[Windowed[String], java.lang.Long] =
ipandTime
.groupByKey(Serdes.String(), Serdes.Long())
.reduce(creducer, deduplicationWindow, "ktwindow-query")
当我尝试创建键为的流时,使用selectkey方法时出错 Windowed[String]
. 在java中类似的实现工作得很好。
val fStream = ktwindow
.toStream()
.selectKey(
new KeyValueMapper[Windowed[String],
java.lang.Long,
KeyValue[String, java.lang.Long]] {
override def apply(
key: Windowed[String],
value: java.lang.Long): KeyValue[String, java.lang.Long] = {
new KeyValue(key.key(), value)
}
}
)
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[org.apache.kafka.streams.kstream.Windowed[String],Long,org.apache.kafka.streams.KeyValue[String,Long]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: org.apache.kafka.streams.kstream.Windowed[String], _ >: Long, _ <: KR]
1条答案
按热度按时间yyhrrdl81#
变量
ipandTime
找不到了,所以我把它换成了???
,但这与实际问题无关。如前所述,如果java使用站点通配符的类型推断失败,那么只需添加显式类型参数。这是为Kafka1.1.0编写的:
这个
selectKey
方法需要泛型类型参数KR
,所以我只给出了具体的类型KeyValue[String, java.lang.Long]
对它,然后它就工作了。