假设我有一个有许多分区的主题。我写的k/v数据在那里,并希望聚合所说的数据在滚动窗口的关键。假设我已经启动了尽可能多的工作示例,并且每个工作示例都在一台单独的计算机上运行。如何确保结果聚合包含每个键的所有值?我不希望每个worker示例都有一些值的子集。这是statestore的用途吗?Kafka是自己解决的还是我需要想出一个方法?
5f0d552i1#
如何确保结果聚合包含每个键的所有值?我不希望每个worker示例都有一些值的子集。一般来说,kafka streams确保同一个键的所有值都将由同一个(并且只有一个)流任务处理,这也意味着只有一个应用程序示例(您所描述的“worker instance”)将处理该键的值。请注意,应用程序示例可以运行1+个流任务,但这些任务是独立的。这种行为是通过对数据进行分区来实现的,kafka streams确保一个分区总是由同一个且只有一个流任务来处理。到键/值的逻辑链接是,在kafka和kafka流中,一个键总是被发送到同一个分区(这里有一个gotcha,但我不确定详细讨论这个问题的范围是否有意义),因此在可能的多个分区中,一个特定分区包含同一个键的所有值。在某些情况下,例如连接两个流时 A 以及 B ,但您必须确保聚合将在同一个键上操作,以确保来自两个流的数据位于同一个流任务中——这同样是关于确保相关的输入流分区,从而匹配键(来自 A 以及 B ,分别在同一流任务中可用。您在这里使用的典型方法是 selectKey() . 完成后,kafka streams确保,为了连接两个流a和b以及创建连接的输出流,相同密钥的所有值将由相同的流任务处理,从而由相同的应用程序示例处理。例子:溪流 A 有钥匙 userId 有价值的 { georegion } .溪流 B 有钥匙 georegion 有价值的 { continent, description } .当两个流使用相同的密钥时,连接两个流才起作用(从kafka 0.10.0开始)。在本例中,这意味着您必须对流重新设置密钥(从而重新分区) A 因此结果键从 userId 至 georegion . 否则,从Kafka0.10开始,您不能加入 A 以及 B 因为数据不在负责实际执行连接的流任务中的同一位置。在本例中,您可以对流重新设置密钥/重新分区 A 通过:
A
B
selectKey()
userId
{ georegion }
georegion
{ continent, description }
// Kafka 0.10.0.x (latest stable release as of Sep 2016) A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic") // Upcoming versions of Kafka (not released yet) A.map((userId, georegion) -> KeyValue.pair(georegion, userId))
这个 through() 只有在kafka 0.10.0中才需要调用来实际触发重新分区,而kafka的更高版本将自动为您执行这些操作(这个即将推出的功能已经在kafka中完成并可用) trunk ).这是statestore的用途吗?Kafka是自己解决的还是我需要想出一个方法?一般来说,不会。上面的行为是通过分区实现的,而不是通过状态存储实现的。有时由于您为流定义的操作而涉及状态存储,这可能解释了您为什么要问这个问题。例如,窗口操作将需要管理状态,因此将在后台创建状态存储。但您的实际问题——“确保结果聚合包含每个键的所有值”——与状态存储无关,而是与分区行为有关。
through()
trunk
jfgube3f2#
对于worker示例,我假设您是指kafka streams应用程序示例,对吗(因为在kafka流中没有主/工作模式——它是一个库而不是一个框架——所以我们不使用术语“工作者”。)如果您想按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就写入主题时,您的数据由外部生产者按键进行分区。或者在kafka streams应用程序中显式设置一个新密钥(例如 selectKey() 或者 map() )并通过呼叫 through() . (明确要求 through() 在以后的版本中不需要, 0.10.1 如果需要,kafka streams将自动重新分发记录。)如果应该对消息/记录进行分区,则不能使用键 null . 您还可以通过producer配置更改分区模式 partitioner.class (见https://kafka.apache.org/documentation.html#producerconfigs).分区完全独立于状态存储,即使状态存储通常用于分区数据之上。
map()
0.10.1
null
partitioner.class
2条答案
按热度按时间5f0d552i1#
如何确保结果聚合包含每个键的所有值?我不希望每个worker示例都有一些值的子集。
一般来说,kafka streams确保同一个键的所有值都将由同一个(并且只有一个)流任务处理,这也意味着只有一个应用程序示例(您所描述的“worker instance”)将处理该键的值。请注意,应用程序示例可以运行1+个流任务,但这些任务是独立的。
这种行为是通过对数据进行分区来实现的,kafka streams确保一个分区总是由同一个且只有一个流任务来处理。到键/值的逻辑链接是,在kafka和kafka流中,一个键总是被发送到同一个分区(这里有一个gotcha,但我不确定详细讨论这个问题的范围是否有意义),因此在可能的多个分区中,一个特定分区包含同一个键的所有值。
在某些情况下,例如连接两个流时
A
以及B
,但您必须确保聚合将在同一个键上操作,以确保来自两个流的数据位于同一个流任务中——这同样是关于确保相关的输入流分区,从而匹配键(来自A
以及B
,分别在同一流任务中可用。您在这里使用的典型方法是selectKey()
. 完成后,kafka streams确保,为了连接两个流a和b以及创建连接的输出流,相同密钥的所有值将由相同的流任务处理,从而由相同的应用程序示例处理。例子:
溪流
A
有钥匙userId
有价值的{ georegion }
.溪流
B
有钥匙georegion
有价值的{ continent, description }
.当两个流使用相同的密钥时,连接两个流才起作用(从kafka 0.10.0开始)。在本例中,这意味着您必须对流重新设置密钥(从而重新分区)
A
因此结果键从userId
至georegion
. 否则,从Kafka0.10开始,您不能加入A
以及B
因为数据不在负责实际执行连接的流任务中的同一位置。在本例中,您可以对流重新设置密钥/重新分区
A
通过:这个
through()
只有在kafka 0.10.0中才需要调用来实际触发重新分区,而kafka的更高版本将自动为您执行这些操作(这个即将推出的功能已经在kafka中完成并可用)trunk
).这是statestore的用途吗?Kafka是自己解决的还是我需要想出一个方法?
一般来说,不会。上面的行为是通过分区实现的,而不是通过状态存储实现的。
有时由于您为流定义的操作而涉及状态存储,这可能解释了您为什么要问这个问题。例如,窗口操作将需要管理状态,因此将在后台创建状态存储。但您的实际问题——“确保结果聚合包含每个键的所有值”——与状态存储无关,而是与分区行为有关。
jfgube3f2#
对于worker示例,我假设您是指kafka streams应用程序示例,对吗(因为在kafka流中没有主/工作模式——它是一个库而不是一个框架——所以我们不使用术语“工作者”。)
如果您想按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就写入主题时,您的数据由外部生产者按键进行分区。或者在kafka streams应用程序中显式设置一个新密钥(例如
selectKey()
或者map()
)并通过呼叫through()
. (明确要求through()
在以后的版本中不需要,0.10.1
如果需要,kafka streams将自动重新分发记录。)如果应该对消息/记录进行分区,则不能使用键
null
. 您还可以通过producer配置更改分区模式partitioner.class
(见https://kafka.apache.org/documentation.html#producerconfigs).分区完全独立于状态存储,即使状态存储通常用于分区数据之上。