如何在kafka中实现timeseries汇总?

xzlaal3s  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(443)

我想用Kafka在一家公司内部分发高频金融市场价格。数据以每秒2000-3000个数字的速度从不同的提供商处传入。消费者对最新价格感兴趣,因为这是最新的价格,然而,他们通常也对获取价格的历史感兴趣。
现在,像美元/欧元汇率(eurusd)这样的高流动性系列可能会导致每秒最多100条消息。当消费者想要历史数据时,他们想要的是一个采样序列,而不是整个消息日志,因为这将是巨大的。例如,他们可能只需要每5分钟返回一次的价格历史记录,比如说,10天,也就是说,在过去的8600万个滴答声中(10天24小时3600秒100/秒=8640万条消息)只需要日志中的每30000条消息(10060*5)。
每30000天解析一次整个10天的日志,肯定是一个超级昂贵的操作。显然,我可以让一个消费者这样做,然后每5分钟重新发布到另一个主题,但现在我有两个不同的主题为同一个股票代码(eurusd),这又引入了一种“批量与实时”架构。而且,我不想这么快用完空间。每秒存储100个滴答声实在太多了。同时,我也希望最新的价格可以不运行两个主题。
这个问题怎么解决?理想情况下,我希望实时价格在任何时候都被公布,而且,当回到日志中,只有每5分钟左右的历史信息。这是否可行/可行,无需昂贵的扫描?Kafka能推出那些没有存储在日志中的消息(比如说,丢失并不是什么大不了的消息),但是每5分钟存储一条吗?怎么做?

xfb7svmp

xfb7svmp1#

您可以使用offsetsfortime获取所需分区的偏移Map,并从中查找。据我所知,这是通过引入基于时间的索引实现的(参见https://cwiki.apache.org/confluence/display/kafka/kip-33+-+add+a+time+based+log+index#kip-33 addatimebasedlogindex强制基于时间的日志保留),因此我假设它尽可能地高效。
但你不能告诉Kafka根据时间戳有选择地存储。如果一个主题只包含那些选定的消息,那么应该复制到一个新主题

相关问题