reduce()生成增量结果,而不是最终值

xzlaal3s  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(246)

我正在尝试使用pythonapi在flink上实现kmeans clustering算法。我在做一件事 key_by 基于第0个索引,然后尝试 reduce() 在每个组上获得一种计数聚合。

class CentroidAccumulator(ReduceFunction):                                                                                                                                       
    def reduce(self, val1, val2):                                                                                                                                                
        id1, point1, count1 =  val1                                                                                                                                              
        id2, point2, count2 =  val2                                                                                                                                              
        return (id1, point1.add(point2), count1+count2)   

class Selector(KeySelector):                                                                                                                                                     
    def getKey(self, value):                                                                                                                                                     
        return value[0]   

nearest_points = points \                                                                                                                                                
                .map(SelectNearestPoint(centroids)) \                                                                                                                            
                .key_by(Selector()).reduce(CentroidAccumulator()) 
nearest_points.write_as_text("output.txt")

预期结果:

(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)

实际结果:
我得到了写入文件的所有迭代的输出(我测试的样本中有40个点,因此输出有40行)

(1, <kmeans_clustering.Point instance at 0x2>, 1)                                                                                                                                
(3, <kmeans_clustering.Point instance at 0x3>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x4>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x5>, 2)                                                                                                                                
.
.
.                                                                                                                
(2, <kmeans_clustering.Point instance at 0x20>, 13)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x21>, 14)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x22>, 10)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x23>, 4)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x24>, 15)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x25>, 16)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x26>, 11)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x27>, 5)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x28>, 17)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x29>, 18)

问题是reduce是正确的,但是我只想得到每个组reduce转换的最后一个值(这是reduce应该如何工作的我的理解)。我做错什么了?

c3frrgcw

c3frrgcw1#

你没有做错什么;这是流reduce函数的预期行为。从概念上讲,数据流是无穷无尽的数据流,因此“等到最后”才产生结果是没有意义的。流媒体程序的标准行为是为每个事件生成一个结果。
当然,这可能有点不方便。如果你只想看到最后的结果,那么就必须有某种方式来表明结局已经到来。对于批处理程序,这是很自然的。对于流式应用程序,有限的数据源发送一个带有值max\u水印的水印,该水印可用于检测输入是否已到达其终点--您可以在带有事件时间计时器的processfunction中捕获该水印,但这是一个有点复杂的解决方案。您还可以使用windows实现一种变通方法。

相关问题