我遵循flink的快速入门示例:监视wikipedia编辑流。
这个例子是用java编写的,我用scala实现它,如下所示:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
然而 fold
flink中的函数已被弃用,并且 aggregate
建议使用函数。
但是我没有找到关于如何转换不推荐的 fold
至 aggregrate
.
你知道怎么做吗?可能不仅仅是申请 aggregrate
.
更新
我还有另一个实现,如下所示:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/**Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
我也想知道如何使用自定义的实现 AggregateFunction
.
更新
我遵循了以下文档:aggregatefunction,但有以下问题:
在接口的源代码中 AggregateFunction
对于1.3版,您将看到 add
真的回来了 void
:
void add(IN value, ACC accumulator);
但是对于版本1.4 AggregateFunction
,正在返回:
ACC add(IN value, ACC accumulator);
我该怎么处理?
我使用的flink版本是 1.3.2
这个版本的文档没有 AggregateFunction
,但artifactory中还没有1.4版。
2条答案
按热度按时间cwtwac6a1#
你会发现一些文档
AggregateFunction
在Flink1.4文档中,包括一个示例。1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器。Flink1.4已经修复了这个问题,但是还没有发布。
13z8s7eq2#