我有一个电子商务应用程序,每次用户向购物车添加内容时,我都会向Kafka服务器发送一条消息。我可以发送消息并从客户端使用它,但是,我对错误处理很好奇。偶尔,我的go服务器会因为网络错误或其他原因而失败。添加到购物车的功能将是应用程序的重要组成部分,所以我不希望Kafka生产者失败的功能或成为依赖它。我试图通过为Kafka制作人创建一个单独的函数来将它们分开,我认为 kafka.Produce()
该函数是非阻塞的,所以即使失败,用户仍然应该能够将项目添加到购物车。下面是一个示例代码(我把kafka的完整代码放在了部分,但是为了可读性,我删去了添加到cart的实现)。有没有办法退出Kafka功能,如果出了什么问题,或如果它是超过几秒钟超时?因此,添加到购物车功能不会挂起或导致服务器失败。我对go中的通道和并发性不是很有经验,所以我不能确定这是否会成为当前设计的一个问题。
添加到购物车
func addToCart(c *context.Context, rw web.ResponseWriter, req *web.Request) {
cartID := req.PathParams["id"]
var items []map[string]interface{}
if err := json.NewDecoder(req.Body).Decode(&items); err != nil {
errors.Write(rw, 400, "Unable to parse request body JSON or invalid data format.")
return
}
//MAKE SOME OPERATIONS AND SAVE IT TO DATABASE
cart, jsonErr := saveToDB(c, cartID, items)
if jsonErr != nil {
jsonErr.Write(rw)
return
}
webLib.Write204(rw)
deliveryChan := make(chan kafka.Event)
kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
return
}
Kafka
func kafkaMessage(topic string, message []byte, deliveryChan chan kafka.Event, rw web.ResponseWriter, req *web.Request) {
err := c.KafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}, deliveryChan)
if err != nil {
c.Log("error:%s", err)
return
}
e, ok := <-deliveryChan
if !ok{
c.Log("Channel is closed for kafka producer")
return
}
m, ok := e.(*kafka.Message)
if !ok{
c.Log("There has been an error obtaining the kafka message")
return
}
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
c.Log("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
1条答案
按热度按时间wixjitnu1#
因此,发送到kafka是异步的,但实际上,您正在通过等待“成功”消息将其转换为同步功能。
几个快速的选择。
1:通过传递
nil
通道组件deliveryChan
. 然后您就得到了一个“fire-and-forget”异步模型。听起来这可能就是你要找的。2:你可以跑
kafkaMessage
在goroutine中然后您可以在该函数中等待消息、日志等。你甚至可以添加重试,如果你想!请注意,在这种情况下,您可能会得到等待响应消息/重试等的goroutine积压,因为您实际上是将操作作为goroutine排队。对于大多数应用程序来说,这不会是一个问题,因为您不会在处理过程中不断落后,但仍然需要注意监视!
这里有很多其他的模式可以遵循,但这些都是相当低的电梯和给你一些选择。