我正在使用kafkautils.createdirectstream kafka api使用消息,然后处理要存储在配置单元中的消息。由于访问配置单元元存储失败,有时作业会失败。重新启动时,我看到它总是从“最新”消息开始读取(因为在我的代码auto.offset.reset=latest中),而不是从最后提交的消息读取。在这种情况下,它正在失去补偿。因此,我希望在整个处理完成后存储偏移量,以便在检索消息失败时检索它。
请建议如何使用kafkautils.createdirectstream将偏移量存储在kafka中,以确保完成整个kafka作业。
暂无答案!
目前还没有任何答案,快来回答吧!