我正在尝试使用pythonapi在flink上实现kmeans clustering算法。我在做一件事 key_by
基于第0个索引,然后尝试 reduce()
在每个组上获得一种计数聚合。
class CentroidAccumulator(ReduceFunction):
def reduce(self, val1, val2):
id1, point1, count1 = val1
id2, point2, count2 = val2
return (id1, point1.add(point2), count1+count2)
class Selector(KeySelector):
def getKey(self, value):
return value[0]
nearest_points = points \
.map(SelectNearestPoint(centroids)) \
.key_by(Selector()).reduce(CentroidAccumulator())
nearest_points.write_as_text("output.txt")
预期结果:
(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)
实际结果:
我得到了写入文件的所有迭代的输出(我测试的样本中有40个点,因此输出有40行)
(1, <kmeans_clustering.Point instance at 0x2>, 1)
(3, <kmeans_clustering.Point instance at 0x3>, 1)
(2, <kmeans_clustering.Point instance at 0x4>, 1)
(2, <kmeans_clustering.Point instance at 0x5>, 2)
.
.
.
(2, <kmeans_clustering.Point instance at 0x20>, 13)
(2, <kmeans_clustering.Point instance at 0x21>, 14)
(1, <kmeans_clustering.Point instance at 0x22>, 10)
(4, <kmeans_clustering.Point instance at 0x23>, 4)
(2, <kmeans_clustering.Point instance at 0x24>, 15)
(2, <kmeans_clustering.Point instance at 0x25>, 16)
(1, <kmeans_clustering.Point instance at 0x26>, 11)
(4, <kmeans_clustering.Point instance at 0x27>, 5)
(2, <kmeans_clustering.Point instance at 0x28>, 17)
(2, <kmeans_clustering.Point instance at 0x29>, 18)
问题是reduce是正确的,但是我只想得到每个组reduce转换的最后一个值(这是reduce应该如何工作的我的理解)。我做错什么了?
1条答案
按热度按时间c3frrgcw1#
你没有做错什么;这是流reduce函数的预期行为。从概念上讲,数据流是无穷无尽的数据流,因此“等到最后”才产生结果是没有意义的。流媒体程序的标准行为是为每个事件生成一个结果。
当然,这可能有点不方便。如果你只想看到最后的结果,那么就必须有某种方式来表明结局已经到来。对于批处理程序,这是很自然的。对于流式应用程序,有限的数据源发送一个带有值max\u水印的水印,该水印可用于检测输入是否已到达其终点--您可以在带有事件时间计时器的processfunction中捕获该水印,但这是一个有点复杂的解决方案。您还可以使用windows实现一种变通方法。