我们使用Apache Flink来读取一个Kafka主题,其中包含一个Id和一个对象列表,如下所示:
{
Id: "12345",
Objetcs: [
{
fatherId: "1a",
id: "111",
name: "aabc"
},
{
fatherId: "1b",
id: "222",
name: "abffc"
},
{
fatherId: "1a",
id: "333",
name: "gfds"
},
...
]
}
然后我们将其转换为包含listId和乘积的元组数据流。最后我们执行一个KeyBy和一个10秒的TumblingProcessingTimeWindows,以便按listId和fhaterId对数据进行分组,并将分组结果转换为字符串,如下所示:“{ [ {fhaterId:“1a”,标识:“222”,名称:“aabc”},{相片ID:“1a”,标识:“333”,名称:“gfds”} ]、[ {发件人ID:“1a“,标识:“222”,名称:“aabc”} ] }”
问题在于,在某些测试中,我们发送了5个列表,每个列表包含128,000个数据,预期结果应该是5个字符串,但有时会因为其中一条消息被拆分而收到6个字符串。在上面的示例中,结果如下:字符串1:“{[{fhaterId:“1a”,标识:“222”,名称:“aabc”}]、[{发件人ID:“1a”,标识:“222”,名称:“aabc”}]}”字符串2:“{[{fhaterId:“1a”,标识:“333”,名称:“gfds”}]}”
当预期响应是单个字符串时。
原因何在?
流程代码如下:
DataStream<Result> sourceNegotiation = listNegotiationProducts
.flatMap(new FlatMapFunction<ListNegotiationProduct, Tuple2<UUID, NegotiationProduct>>() {
@Override
public void flatMap(ListNegotiationProduct listNegotiationProduct, Collector<Tuple2<UUID, NegotiationProduct>> out) throws Exception {
listNegotiationProduct.getProducts().forEach(lnp -> {
Tuple2<UUID, NegotiationProduct> response = new Tuple2<>(listNegotiationProduct.getTransactionId(), lnp);
out.collect(response);
});
}
})
.keyBy(new KeySelector<Tuple2<UUID, NegotiationProduct>, Tuple2<UUID, Integer>>() {
@Override
public Tuple2<UUID, Integer> getKey(Tuple2<UUID, NegotiationProduct> value) throws Exception {
return Tuple2.of(value.f0, value.f1.getNegotiationId());
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(1))
.apply(new WindowFunction<Tuple2<UUID, NegotiationProduct>, Tuple2<UUID, Negotiation>, Tuple2<UUID, Integer>, TimeWindow>() {
@Override
public void apply(Tuple2<UUID, Integer> uuidIntegerTuple2, TimeWindow window, Iterable<Tuple2<UUID, NegotiationProduct>> iterable, Collector<Tuple2<UUID, Negotiation>> collector) throws Exception {
Negotiation negotiation = new Negotiation();
Tuple2<UUID, Negotiation> response = new Tuple2<>();
List<Product> productList = new ArrayList<>();
iterable.iterator().forEachRemaining(negotiationProduct -> {
negotiation.setNegotiationId(negotiationProduct.f1.getNegotiationId());
response.setField(negotiationProduct.f0, 0);
List<String> observationList = new ArrayList<>();
observationList.add(negotiationProduct.f1.getObservation());
productList.add(Product
.builder()
.productGtin(negotiationProduct.f1.getProductGtin())
.state(negotiationProduct.f1.getState())
.observation(observationList)
.retailerCode(negotiationProduct.f1.getRetailerCode()).build());
});
negotiation.setNegotiationProgressProducts(productList);
response.setField(negotiation, 1);
collector.collect(response);
}
})
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(1))
.apply(new WindowFunction<Tuple2<UUID, Negotiation>, Result, UUID, TimeWindow>() {
@Override
public void apply(UUID uuid, TimeWindow window, Iterable<Tuple2<UUID, Negotiation>> iterable, Collector<Result> collector) throws Exception {
List<Negotiation> negotiations = new ArrayList<>();
iterable.iterator().forEachRemaining(n -> {
negotiations.add(n.f1);
});
collector.collect(BuildResult.build(new Payload(negotiations), uuid));
}
})
.returns(Result.class);
2条答案
按热度按时间yftpprvb1#
我认为问题是
TumblingProcessingTimeWindows
与系统时钟对齐。有时候你会很不走运,一个事件会在11:48:09.988到达(例如),正好在一个窗口结束和另一个窗口开始之前。从它创建的一些元组会落在一个窗口中,而其余的会落在下一个窗口中。Flink的DataStream窗口API并不适合这个用例。通过在API堆栈中向上或向下来解决这个问题会更容易。
如果您上一级到Table API,则可以使用over windows,它允许在相对于特定事件的时间间隔内进行分组。
如果您向下一级并使用KeyedProcessFunction,则可以直接使用状态和时间。例如,您可以像现在这样对流进行键控,然后在ListState中收集在10秒间隔内到达的所有事件。
k5ifujac2#
在窗口()之后尝试.允许延迟(时间.秒(2))
编辑:见下面大卫的注解。