kafka ktable-跨机器共享聚合

7qhs6swi  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(442)

假设我有一个有许多分区的主题。我写的k/v数据在那里,并希望聚合所说的数据在滚动窗口的关键。
假设我已经启动了尽可能多的工作示例,并且每个工作示例都在一台单独的计算机上运行。
如何确保结果聚合包含每个键的所有值?我不希望每个worker示例都有一些值的子集。
这是statestore的用途吗?Kafka是自己解决的还是我需要想出一个方法?

5f0d552i

5f0d552i1#

如何确保结果聚合包含每个键的所有值?我不希望每个worker示例都有一些值的子集。
一般来说,kafka streams确保同一个键的所有值都将由同一个(并且只有一个)流任务处理,这也意味着只有一个应用程序示例(您所描述的“worker instance”)将处理该键的值。请注意,应用程序示例可以运行1+个流任务,但这些任务是独立的。
这种行为是通过对数据进行分区来实现的,kafka streams确保一个分区总是由同一个且只有一个流任务来处理。到键/值的逻辑链接是,在kafka和kafka流中,一个键总是被发送到同一个分区(这里有一个gotcha,但我不确定详细讨论这个问题的范围是否有意义),因此在可能的多个分区中,一个特定分区包含同一个键的所有值。
在某些情况下,例如连接两个流时 A 以及 B ,但您必须确保聚合将在同一个键上操作,以确保来自两个流的数据位于同一个流任务中——这同样是关于确保相关的输入流分区,从而匹配键(来自 A 以及 B ,分别在同一流任务中可用。您在这里使用的典型方法是 selectKey() . 完成后,kafka streams确保,为了连接两个流a和b以及创建连接的输出流,相同密钥的所有值将由相同的流任务处理,从而由相同的应用程序示例处理。
例子:
溪流 A 有钥匙 userId 有价值的 { georegion } .
溪流 B 有钥匙 georegion 有价值的 { continent, description } .
当两个流使用相同的密钥时,连接两个流才起作用(从kafka 0.10.0开始)。在本例中,这意味着您必须对流重新设置密钥(从而重新分区) A 因此结果键从 userIdgeoregion . 否则,从Kafka0.10开始,您不能加入 A 以及 B 因为数据不在负责实际执行连接的流任务中的同一位置。
在本例中,您可以对流重新设置密钥/重新分区 A 通过:

// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")

// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))

这个 through() 只有在kafka 0.10.0中才需要调用来实际触发重新分区,而kafka的更高版本将自动为您执行这些操作(这个即将推出的功能已经在kafka中完成并可用) trunk ).
这是statestore的用途吗?Kafka是自己解决的还是我需要想出一个方法?
一般来说,不会。上面的行为是通过分区实现的,而不是通过状态存储实现的。
有时由于您为流定义的操作而涉及状态存储,这可能解释了您为什么要问这个问题。例如,窗口操作将需要管理状态,因此将在后台创建状态存储。但您的实际问题——“确保结果聚合包含每个键的所有值”——与状态存储无关,而是与分区行为有关。

jfgube3f

jfgube3f2#

对于worker示例,我假设您是指kafka streams应用程序示例,对吗(因为在kafka流中没有主/工作模式——它是一个库而不是一个框架——所以我们不使用术语“工作者”。)
如果您想按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就写入主题时,您的数据由外部生产者按键进行分区。或者在kafka streams应用程序中显式设置一个新密钥(例如 selectKey() 或者 map() )并通过呼叫 through() . (明确要求 through() 在以后的版本中不需要, 0.10.1 如果需要,kafka streams将自动重新分发记录。)
如果应该对消息/记录进行分区,则不能使用键 null . 您还可以通过producer配置更改分区模式 partitioner.class (见https://kafka.apache.org/documentation.html#producerconfigs).
分区完全独立于状态存储,即使状态存储通常用于分区数据之上。

相关问题