我正在使用storm实现的滑动窗口:
从这里开始
这是我的拓扑结构:
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitSpout", new RabbitMQSpout());
builder.setBolt("filterBolt", new FilteringBolt()).shuffleGrouping("rabbitSpout");
builder.setBolt("HourStatisticsBolt", new SlidingWindowStatisticsBolt()
.withWindow(new BaseWindowedBolt.Duration(60, TimeUnit.MINUTES),
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withTimestampField("timestamp")).shuffleGrouping("filterBolt");
在我的slidingwindowstatisticsbolt的execute方法中,我想得到windows开始或结束的时间戳。在我的螺栓我怎么能得到窗口长度和滑动持续时间?
1条答案
按热度按时间zpgglvta1#
因为您使用的是事件时间(
withTimestampField
)窗口是基于周期性水印计算的。现在,tuplewindow中没有显示窗口开始/结束时间。在风暴最新的主分支中,tuplewindow有一个
getTimestamp
方法,该方法返回窗口结束时间戳,并适用于基于处理和事件时间的窗口。这将在storm的未来版本(2.0版本)中提供。如果你想在下一个1.x版本中使用它,你可以在这里提交一个jira