我有一个用例,
我需要从数据源读取记录并将其写入多个接收器,包括kafka,以及一些聚合。
下面是我的伪代码的样子,
Dataset<Row> dataset = spark.readStream()......
dataset.writeStream().foreachBatch(
// do some processing, including aggregations
// write it to multiple sinks
batch.write().format('kafka').save();
).start().awaitTermination();
当我在foreach中尝试某种聚合方法时,默认情况下,它采用append模式并删除旧的聚合。因此,输出只包含当前批处理的结果。
我的要求是,当第二批数据到达时,它应该与第一批数据的结果合并。
例如:对于查询, dataset.groupBy("id").count(value)
如果第一批输入是:{“id”:1,“value”:1},{“id”:1,“value”:1}
输出:{“id”:1,“value”:“2”}
第二批输入:{“id”:1,“value”:3},{“id”:1,“value”:2}
输出:{“id”:1,“value”:5}
预期输出:{“id”:1,“value”:7}
如何在spark中实现这一点?
提前谢谢。
1条答案
按热度按时间z8dt9xmd1#
上面的示例是dstream示例,而不是结构化流。您需要将spark结构化流看作是将数据加载到一个无界表中。
假设数据源是kafka,下面是结构化流的一个基本示例。请注意,使用readstream和writestream api无法进行模式推断。模式需要来自数据源连接器,在本例中是kafka。
使用.trigger()函数创建微批,outputmode保存每个微批的结果。在这个例子中,我每10秒创建一个微批,
.trigger(ProcessingTime("10 second"))
以及将流中的每个事件作为一行附加到parquet文件.outputMode(OutputMode.Append())
在您的例子中,您需要使用.trigger(),并选择一个微批处理间隔.outputMode(outputMode.Update())
插入具有值的新键或使用递增值更新现有键。下面的部分是聚合逻辑的发展方向。您可以将聚合逻辑分解为单独的Dataframe,并将Dataframe作为流写入,而不是为了可读性而进行链接。
结构化流媒体的另一个例子。