我的问题与flink中如何支持多个keyby非常相似,只是这个问题是针对java的,我需要scala中的答案。我在intellij中复制粘贴了提供的解决方案,它自动将复制粘贴的代码段转换为scala,然后我对其进行编辑以适合我的代码。我仍然会遇到编译错误(甚至在compilation intellij能够检测到代码的问题之前)。基本上,提供给keyby的参数(keyselector的getkey函数的返回值)与任何重载版本的keyby函数所期望的参数都不匹配。
查找了许多返回复合键的keyselector的scala代码示例,没有找到任何示例。
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new
KeySelector[AAPerMinData, Tuple2[String, String]]() {
@throws[Exception]
override def getKey(value: AAPerMinData): Tuple2[String, String] =
Tuple2.of(value.field1, value.field2)
})
编译代码时出现以下错误:
Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence $2:org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields:
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {
我不确定我在语法中遗漏了什么导致了这个错误。非常感谢您的帮助。下一步,一旦解决了这个问题,就基于复合键执行基于tumblingwindow的摘要。
更新1(12/29/2018):将代码更改为使用keyselector格式将简单的字符串类型字段用作键(我知道这可以用更简单的方法实现,我这样做只是为了让基本的keyselector工作)。
import org.apache.flink.api.java.functions.KeySelector
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
@throws[Exception]
override def getKey(value: AAPerMinData): String = value.set1.sEntId
})
下面是我得到的错误截图(即intellij在鼠标上显示)。
更新2(12/29/2018)
这是有效的(对于单键情况)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String]
(_.set1.sEntId)
这不起作用(对于复合键情况)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)
更新3(12/29/2018)尝试了以下操作,无法使其正常工作。请参见错误屏幕截图。
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))
更新4(12/30/2018)现已解决,见接受答案。对于可能感兴趣的人,这是最后的工作代码,包括使用复合键进行聚合:
// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
out: Collector[AAPerMinDataAggr]) =>
out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
key._1, key._2, // also needed individual pieces
window,
events,
stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()
1条答案
按热度按时间sg2wtvxw1#
首先,虽然没有必要,但是继续使用scala元组。总的来说,这会使事情变得更简单,除非出于某种原因必须与java元组进行互操作。
然后,不要使用org.apache.flink.api.java.functions.keyselector。您想使用org.apache.flink.streaming.api.scala.datastream中的这个keyby:
换句话说,只需传递一个将流元素转换为键值的函数(一般来说,flink的scalaapi尽量做到习惯用法)。所以像这样的事情应该可以做到:
更新:
对于复合键的情况,使用