我正在用Java构建一个事件驱动的软件来收听Kafka的主题,并将消息从我的应用程序发送到其他服务器。如果我的应用程序不能成功地将数据发送到第二个服务器,我希望Kafka消费者继续轮询相同的消息。为此,我设置了手动提交偏移量,并且仅在消息成功发送到第二个服务器时才增加偏移量,但只有在应用程序(使用者)重新启动时,代理才会重新发送消息。这是一个问题,因为我不希望我的应用程序重新启动。如果你对这个问题有什么解决办法,请告诉我。
我正在用Java构建一个事件驱动的软件来收听Kafka的主题,并将消息从我的应用程序发送到其他服务器。如果我的应用程序不能成功地将数据发送到第二个服务器,我希望Kafka消费者继续轮询相同的消息。为此,我设置了手动提交偏移量,并且仅在消息成功发送到第二个服务器时才增加偏移量,但只有在应用程序(使用者)重新启动时,代理才会重新发送消息。这是一个问题,因为我不希望我的应用程序重新启动。如果你对这个问题有什么解决办法,请告诉我。
1条答案
按热度按时间jmp7cifd1#
您需要跟踪并手动
seek
使用者到每个主题分区的最后一个未处理的偏移量。您可能还希望pause()
使用者并停止任何轮询循环,直到处理完每条记录