scala—如何在数据流上迭代

rsaldnfx  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(333)

我是斯卡拉的新手。我有一个自定义的类analytics.scala,它有几个变量(var a,var b,var c)。我在测试用例中得到了一个类型分析的数据流,我想为每个对象将var c的值设置为“0”。
我试过在datastream上使用map函数,但没用。我还尝试将流转换为列表,然后在该列表上迭代,但也不起作用。
流的类型为datastream[analytics]。这就是我尝试过的:

stream.map(x => x.c=0)
val a = DataStreamUtils.collect(stream.javaStream).asScala.toArray.iterator
a.foreach(x => x.c=0)

在我的测试用例中,var c的值没有变为0。

xj3cbfub

xj3cbfub1#

一般来说,flink数据流不是一个有限的集合,你可以迭代一次就可以完成——它是一个潜在的无限流,只需要不断地拥有更多的数据。
使用Map是正确的方法。但是当你把Map应用到一个流中,比如

stream.map(x => x.c=0)

您正在描述流转换,而不是修改流本身。你应该试试

streamWhereCisZero = stream.map(x => x.c=0)

这将创建一个新的流,其中每个元素都将c设置为零。

tv6aics1

tv6aics12#

我就是这样迭代的。不确定这是不是最好的解决方案。

val collection = DataStreamUtils.collect(stream.javaStream)
val results: Seq[Analytics] = collection.asScala.toSeq
for (result <- results){
    result.c=0
}

相关问题