假设我们有一个包含5个分区的indata主题,其中包含contract数据和constract作为键。我有3个kafka streams应用程序示例,它统计了合同的数量。
现在,我想在kafka streams应用程序中实现一个契约总数。现在我了解到每个流应用程序只分配给一个分区。也就是说,kafka streams应用程序的每个示例只有每个分区的计数?
如何计算执行的合同总数?我需要一个只有一个分区的中间主题吗?是否可以使用全球表实现?
假设我们有一个包含5个分区的indata主题,其中包含contract数据和constract作为键。我有3个kafka streams应用程序示例,它统计了合同的数量。
现在,我想在kafka streams应用程序中实现一个契约总数。现在我了解到每个流应用程序只分配给一个分区。也就是说,kafka streams应用程序的每个示例只有每个分区的计数?
如何计算执行的合同总数?我需要一个只有一个分区的中间主题吗?是否可以使用全球表实现?
1条答案
按热度按时间7eumitmz1#
使用
GlobalKTable
或者全局状态存储无法工作(至少不能直接工作),因为两者都只能存储来自某个主题的未修改数据,但是,您需要进行一些处理(即计数)。如果你想数一数
contactId
您应该首先将数据加载到KTable
(通过builder.table()
)然后做一个groupBy().count()
--在groupBy()
将所有记录Map到同一个新键。因为所有记录都Map到同一个键,所以它们将被重新分区到同一个主题分区,从而得到一个全局计数。