如何在Go中从Kafka阅读并写入PostgreSQL时处理错误?

z5btuh9x  于 2023-08-01  发布在  Go
关注(0)|答案(2)|浏览(77)

我正在构建一个Go应用程序,它从Kafka主题中读取消息并将其写入PostgreSQL数据库。
我设置了一个循环,它使用kafka.Reader从Kafka读取消息,并使用sql.DB将它们插入数据库。如果在阅读消息或将其插入数据库时出错,我会记录错误并继续处理下一条消息。
但是,我不确定如何处理在将数据插入PostgreSQL数据库后提交Kafka消息时发生的错误。具体来说,如果手动提交导致错误,我应该怎么做?我应该重试提交操作吗?我应该记录错误并继续下一条消息吗?处理这些类型错误的最佳实践是什么?

for {
        kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
        if err != nil {
            fmt.Printf("Failed to read message from Kafka: %s\n", err)
            continue
        }

        _, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
        if err != nil {
            fmt.Printf("Failed to insert payload into database: %s\n", err)
            continue
        }

        // What should I do if the commit operation fails?
        err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
        if err != nil {
            // What's the best practice for handling this error?
        }
    }

字符串

flvlnr44

flvlnr441#

当遇到错误时,只需将continue循环到for循环的下一次迭代。
如果由于任何原因未能提交Kafka消息,Kafka将在下一个reader.ReadMessage(ctx)中再次返回相同的消息。
但是要确保你的代码不会继续徒劳地做同样失败的工作很多次,耗尽你的资源,用同样的错误消息淹没日志,等等。在每个错误后使用简单的sleep,或者如果确实需要,为您的功能使用断路器逻辑。

if err != nil {
   log.Errorf("...", ...)
   time.Sleep(5 * time.Second)
   continue
}

字符串

ymdaylpp

ymdaylpp2#

一般来说,业务逻辑应该依赖于数据最后一级的一致性。对于您的情况,我假设数据将在数据库中持久化,这是唯一重要的事情,那么您应该围绕数据库设计一个一致性模型,找到数据属性并设计一个适当的业务流程,以帮助您保证数据库的最终一致性。
对于你的问题,你是否处理错误真的不重要,在Kafka发生了一些意想不到的事情,你忽略它或者什么,如果提交(实际上)成功,Kafka应该继续下一条消息,或者如果提交(实际上)失败,停留在当前偏移量。这就是所谓的at-least-once传递,因此您的业务逻辑应该正确地处理重复的消息。
如果您想要exactly-once交付,那就麻烦多了,我不建议业务逻辑依赖于精确一次交付。例如,如果数据库插入了记录,但当响应传输回应用程序时网络失败,会发生什么?您会假设网络在数据库提交之后还是在数据库提交之前丢失?当你的请求到达Kafka服务器时,Kafka可以保证只传递一次,但是在Kafka服务器的范围之外,它不能帮助你太多。

相关问题