如果这是个愚蠢的问题,我很抱歉。
我有一个场景,其中我有3个来自上游服务的主题(没有键控)。不幸的是,我无法改变这三个主题的行为。
上游服务在一天结束时批量发布所有消息,我需要获得事务的累积视图,因为事务的顺序对下游服务很重要。
我知道我不能对主题的不同分区中的消息重新排序,所以我想如果我可以累积它们,我的服务就可以在处理之前获取累积的结果并重新排序。
然而,我注意到一个奇怪的行为,我希望有人能澄清我错过了什么。
当我使用1到500个帐户执行此操作时,我看到在输出主题中累积并显示了500条消息。
但是,当我对10000个帐户尝试相同的操作时,我看到的输出比应该的多(关于输出主题的13000条消息)。
KStream<String, TransactionAccumulator> transactions =
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(k, v) -> v.getAccountId(),
with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
(aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
Materialized.with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.toStream((key, value) -> key.key());
如前所述,上游服务在一天结束时批量发布所有事件(而不是实时发布)。
我很感激我在这里错过了什么,因为对于较小的卷,它似乎工作。
更新1
我尝试了使用抑制的建议,试图只发送最后一个窗口。
但是,当使用它时,它基本上不会向输出主题发布任何消息,尽管我看到“ktable suppress state store”中有消息
带有suppress的更新代码如下所示。
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(key, value) -> value.getAccountId(),
Grouped.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.mapValues(
value -> {
LOGGER.info(
"Sending {} Transactions for {}",
value.getTransactions().size(),
value.getAccountId());
return value;
})
.toStream((key, value) -> key.key());
我也没有看到日志消息的介绍。为了清楚起见,我在这个实验中使用了springcloudstream,我在stream应用程序上看到的最终日志条目如下所示。
INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
1条答案
按热度按时间uqdfh47h1#
对不起,我还不能评论,但这是我的两分钱:
KGroupedStream.aggregate()
:kafka stream使用记录缓存来控制从的物化视图(或ktable)发出聚合更新的速率aggregate
状态存储和下游处理器。e、 g带信息:以及你的字数结构:
您可能会收到如下下游消息:
因此,我猜测输入主题可能包含单个accountid的多个事务,并且记录缓存在缓存被刷新时被刷新(
cache.max.bytes.buffering
)已满或commit.interval.ms
他很满足。如果你的sink是幂等的,你可以重写你的sink
TransactionAccumulator
或者您可以使用KTable.suppress()
如这里所述,只发出聚合窗口的最后一条消息。