如何优雅地处理apachekafka producer中的错误

5lwkijsr  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我有一个电子商务应用程序,每次用户向购物车添加内容时,我都会向Kafka服务器发送一条消息。我可以发送消息并从客户端使用它,但是,我对错误处理很好奇。偶尔,我的go服务器会因为网络错误或其他原因而失败。添加到购物车的功能将是应用程序的重要组成部分,所以我不希望Kafka生产者失败的功能或成为依赖它。我试图通过为Kafka制作人创建一个单独的函数来将它们分开,我认为 kafka.Produce() 该函数是非阻塞的,所以即使失败,用户仍然应该能够将项目添加到购物车。下面是一个示例代码(我把kafka的完整代码放在了部分,但是为了可读性,我删去了添加到cart的实现)。有没有办法退出Kafka功能,如果出了什么问题,或如果它是超过几秒钟超时?因此,添加到购物车功能不会挂起或导致服务器失败。我对go中的通道和并发性不是很有经验,所以我不能确定这是否会成为当前设计的一个问题。
添加到购物车

func addToCart(c *context.Context, rw web.ResponseWriter, req *web.Request) {
    cartID := req.PathParams["id"]
    var items []map[string]interface{}
    if err := json.NewDecoder(req.Body).Decode(&items); err != nil {
        errors.Write(rw, 400, "Unable to parse request body JSON or invalid data format.")
        return
    }

    //MAKE SOME OPERATIONS AND SAVE IT TO DATABASE
    cart, jsonErr := saveToDB(c, cartID, items)
    if jsonErr != nil {
        jsonErr.Write(rw)
        return
    }

    webLib.Write204(rw)

    deliveryChan := make(chan kafka.Event)

    kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
    return
}

Kafka

func kafkaMessage(topic string, message []byte, deliveryChan chan kafka.Event, rw web.ResponseWriter, req *web.Request) {
    err := c.KafkaProducer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }, deliveryChan)

    if err != nil {
        c.Log("error:%s", err)
        return
    }

    e, ok := <-deliveryChan
    if !ok{
        c.Log("Channel is closed for kafka producer")
        return
    }

    m, ok := e.(*kafka.Message)
    if !ok{
        c.Log("There has been an error obtaining the kafka message")
        return
    }

    if m.TopicPartition.Error != nil {
        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
    } else {
        c.Log("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }
}
wixjitnu

wixjitnu1#

因此,发送到kafka是异步的,但实际上,您正在通过等待“成功”消息将其转换为同步功能。
几个快速的选择。
1:通过传递 nil 通道组件 deliveryChan . 然后您就得到了一个“fire-and-forget”异步模型。听起来这可能就是你要找的。
2:你可以跑 kafkaMessage 在goroutine中

deliveryChan := make(chan kafka.Event)
go kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
return

然后您可以在该函数中等待消息、日志等。你甚至可以添加重试,如果你想!请注意,在这种情况下,您可能会得到等待响应消息/重试等的goroutine积压,因为您实际上是将操作作为goroutine排队。对于大多数应用程序来说,这不会是一个问题,因为您不会在处理过程中不断落后,但仍然需要注意监视!
这里有很多其他的模式可以遵循,但这些都是相当低的电梯和给你一些选择。

相关问题