我正在运行结构化的流媒体管道,使用hdfs作为源,hive作为接收器。我使用的spark版本是spark2.4.0-cdh6.2.1。我需要在5分钟的滚动/静态窗口大小内消除流式Dataframe中的重复数据。但是,我的Dataframe不包含任何事件时间列。我为实现预期结果而编写的代码是-
String eventTimeWatermarkingColumn = "custom_eventTime";
Dataset<Row> intermediateDSWithWatermarkingColumn = inputFileDS.withColumn(eventTimeWatermarkingColumn,
functions.current_timestamp());
Dataset<Row> intermediateDS = intermediateDSWithWatermarkingColumn
.withWatermark(eventTimeWatermarkingColumn, "5 minutes")
.dropDuplicates(new String[]{"colA","colB","colC"});
在代码中,我在dataframe中添加了一个自定义timestamp列,该列的值为current timestamp,并将其用作withwatermark方法的参数。
期望的行为:使用上面的代码,我的期望是每5分钟的窗口帧,应该删除重复的记录(如果有的话),一旦5分钟的窗口时间过去,应该允许任何重复的记录。确切地说,我想水印应该使存储状态的数据每5分钟窗口时间只有一次,一旦5分钟过去之前的状态存储数据应该被删除。
应用程序行为:然而,应用程序并没有按预期工作,相反,水印并没有起任何作用,在5分钟的窗口帧内或超过5分钟的窗口帧内插入的每个重复记录都会被应用程序删除。我无法理解这种行为。这是危险的,因为这可能会导致对象在某个时间点之后出现内存不足错误。
发生这种情况是因为我们引入了自定义时间列吗?如果是这样的话,如何将自定义eventtime列添加到dataframe(在模式中没有eventtime列)以使水印正常工作?
暂无答案!
目前还没有任何答案,快来回答吧!