事情是这样的--我必须从Kafka读取一条消息,并检查某个特定的头是否有date〉= than now ---〉,然后相应地处理它,否则我应该能够根据需要多次获得相同的消息,以使上面的条件成为真。当然,同一主题中的其他消息不应该被屏蔽。我该怎么做?到目前为止,我脑子里想的是把同样的信息重新发布给Kafka,次数不限,只要满足条件就行。但这种做法好吗?难道它不会简单地复制Kafka中的消息并锁定kafka的存储吗?
事情是这样的--我必须从Kafka读取一条消息,并检查某个特定的头是否有date〉= than now ---〉,然后相应地处理它,否则我应该能够根据需要多次获得相同的消息,以使上面的条件成为真。当然,同一主题中的其他消息不应该被屏蔽。我该怎么做?到目前为止,我脑子里想的是把同样的信息重新发布给Kafka,次数不限,只要满足条件就行。但这种做法好吗?难道它不会简单地复制Kafka中的消息并锁定kafka的存储吗?
1条答案
按热度按时间0x6upsns1#
应该能够根据需要多次获得相同的消息,以使上述条件变为真
可以。禁用使用者组自动提交。在循环中使用主题,并使用
seek
函数从某个偏移量开始重复。如果找到所需的数据,则中断循环并提交偏移量。同一主题中的其他消息不应被阻止
他们永远不会
重新发布相同的消息。..它会使Kafka的仓库被锁上吗?
Kafka会在一周后删除旧的片段,所以不会,除非你在此之前填满磁盘,导致服务器崩溃。
最终,这对Kafka来说不是一个好的用例。将数据转储到某个时间序列数据库中,然后从那里查询。