我在c++应用程序中使用librdkafka将键值对存储在kafka主题中。例如:<1, 100> <2, 101> <3, 200>但是,我可以通过向主题发送以下消息来更新键值对:<1, 103>如何确保消费者只消费<1103>而不是<1100>?
gojuced71#
你可以做一个 seek() 在消费者寻求一个特定的抵消。从这个偏移量,您可以轮询消息。轮询的消息可能同时包含 <1,100> 以及 <1,103> .因此,为了获得给定键的最新值,您应该维护一个数据结构(如Map),在该结构中存储键及其值,并在每次轮询时使用 put(key, value) 如果你打电话来 get(key) 您可以获得该键的最新值,该键在该时刻之前一直处于轮询状态。不过,你可以试着减少 segment.ms 以及 segment.bytes 对于您的Kafka主题,并将主题设置为压缩,您仍然可能会收到多个具有相同密钥的消息。此外,设置 segment.ms 或者 segment.bytes 也不建议值太小,因为它会导致不必要的新段滚动。简而言之,您不能确保使用者只使用最新的值。因为,kafka本身并不关心最新值,而是客户机应该读取消息并获取某个密钥的最新值。提示:如果您使用的是消费群体( subscribe() )然后可以使用持久Map存储所有以前轮询的键值对,并从最后提交的偏移量开始轮询。这样就避免了每次启动应用程序时都试图重新开始。
seek()
<1,100>
<1,103>
put(key, value)
get(key)
segment.ms
segment.bytes
subscribe()
1条答案
按热度按时间gojuced71#
你可以做一个
seek()
在消费者寻求一个特定的抵消。从这个偏移量,您可以轮询消息。轮询的消息可能同时包含<1,100>
以及<1,103>
.因此,为了获得给定键的最新值,您应该维护一个数据结构(如Map),在该结构中存储键及其值,并在每次轮询时使用
put(key, value)
如果你打电话来get(key)
您可以获得该键的最新值,该键在该时刻之前一直处于轮询状态。不过,你可以试着减少
segment.ms
以及segment.bytes
对于您的Kafka主题,并将主题设置为压缩,您仍然可能会收到多个具有相同密钥的消息。此外,设置segment.ms
或者segment.bytes
也不建议值太小,因为它会导致不必要的新段滚动。简而言之,您不能确保使用者只使用最新的值。因为,kafka本身并不关心最新值,而是客户机应该读取消息并获取某个密钥的最新值。
提示:如果您使用的是消费群体(
subscribe()
)然后可以使用持久Map存储所有以前轮询的键值对,并从最后提交的偏移量开始轮询。这样就避免了每次启动应用程序时都试图重新开始。