为什么这段代码没有给予任何东西?如果我改为TumblingProcessingTimeWindows
-所有工作正常。
我没有在文档中找到我还必须添加什么?触发器?驱逐器?允许迟到?
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());
ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
return acc;
}).addSink(new PrintSinkFunction());
输入:
{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}
系统时间=输入时间
更新二:
1.我在withTimestampAssigner
中添加了一些打印信息-它在每个事件中都被调用。
1.我添加了OutputTag用于捕获丢弃的事件-它很清楚。
return new OutputTag(){};
1.我添加了debug print internal来减少函数-它在每个事件上都被调用。
但是print(sink)对于关闭输出窗口没有=(
所有代码:
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " is: " + time + " dont know: " + timestamp);
return time;
});
OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};
SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.sideOutputLateData(lateTag)
.reduce((acc, i) -> {
System.out.println(i.dt + " reDUCE:");
acc.count += i.count;
acc.dt = i.dt;
return acc;
});
reduce.getSideOutput(lateTag).print();
reduce.addSink(new PrintSinkFunction());
}
更新3:
有趣的是,TumblingEventTimeWindows
需要ProcessAllWindowFunction
,尽管TumblingProcessingTimeWindows
没有它也能工作。
所以我添加了ProcessAllWindowFunction
,但还是没有结果,我添加了print来调试,这部分代码没有调用。
有趣的一点,如果我改变到TumblingProcessingTimeWindows
,窗口关闭和下沉即使没有ProcessAllWindowFunction
x1c 0d1x
所有代码:
public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
UserModel um = iterable.iterator().next();
System.out.println(um.count + " rich:" + um.dt);
collector.collect(new UserModelEx() {{
userId = um.userId;
count = um.count;
wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
}});
}
}
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
return time;
});
SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
}, new Rich());
reduce.print();
//reduce.addSink(new PrintSinkFunction<UserModelEx>());
}
2条答案
按热度按时间46scxncf1#
所以4天后我发现了问题,但我不能解释为什么它是必要的。
env.setParallelism(1);
-解决了我的问题。我正在从topic partition = 1阅读Kafka,即默认情况下并行度应该是1。
我希望Maven们能解释一下为什么我的例子需要这个参数...
qncylg1j2#
通常情况下,你会提供一个ProcessAllWindowFunction与reduce调用,请参阅此API。ProcessAllWindowFunction.process方法将为每个窗口调用,并由迭代器提供单个元素(所有reduce调用的结果)。