我有要求重新处理所有关于Kafka主题的消息在一天结束的工作。有没有最好的方式来实现Kafka主题?
我已经用StateStore实现了,但StateStore有自己的挑战,而且产品还没有准备好投入生产。
另一种方法是使用Kafka-Connect将数据保存到RDBMS中,然后重新处理它们,但尝试使用Kafka主题构建一些东西,并避免使用任何其他交互
avkwfej41#
在最底层,您可以只使用带有偏移量策略auto.offset.reset=earliest的KafkaConsumer(如果您确实想处理所有消息)。
auto.offset.reset=earliest
此外,如果您更愿意执行最后一次处理,还可以使用timestamp -> offset Consumer API(或者,您可以将保留时间设置为24小时,但根据您的服务器不同,保留时间并不精确,因此我会发现纯API方法更可靠/更可预测)。
timestamp -> offset
1条答案
按热度按时间avkwfej41#
在最底层,您可以只使用带有偏移量策略
auto.offset.reset=earliest
的KafkaConsumer(如果您确实想处理所有消息)。此外,如果您更愿意执行最后一次处理,还可以使用
timestamp -> offset
Consumer API(或者,您可以将保留时间设置为24小时,但根据您的服务器不同,保留时间并不精确,因此我会发现纯API方法更可靠/更可预测)。