我试图用Kafka蒸汽来减少一系列的数字,我只想在数据发生变化时记录出来。它工作得很完美,但问题是,如果运行代码的服务已关闭,它就无法从kafka获取数据。所以我猜答案是错的?我的代码:
KGroupedStream<String, JsonNode> groupedStream = filteredStream.groupByKey( Serdes.String(), jsonSerde);
KTable<String, JsonNode> reducedTable = groupedStream.reduce(
(aggValue, newValue) -> Calculate.newValue( newValue, aggValue, logger) ,/* adder */
"reduced-stream-store" /* state store name */);
KStream<String, JsonNode> reducedStream = reducedTable.toStream();
“计算”方法:
if (value != oldValue)
return value
else return null.
如果你有意见/建议,谢谢
1条答案
按热度按时间shstlldc1#
return null
在您的代码中,将从结果表中删除条目。因此,您的代码没有达到预期的效果。事实上,dsl操作符发出“on update”而不是“on change”,因此您不能将dsl用于您的用例。有一张罚单建议添加“emit on change”语义(https://issues.apache.org/jira/browse/kafka-8770).
作为解决方法,您需要使用自定义
transform()
用stat store代替。对于每个输入记录,检查它是否存在于存储中。如果没有,则发出记录并将其放入存储区。如果真的存在并且是相同的,不要发射任何东西。如果不同,则发出并更新存储。