我正在运行一个Golang TCP服务器,它接收一个连接,生成一个goroutine来处理这个连接,然后生成一个新的goroutine来从这个连接中读取数据并将数据写入一个通道。还有一个简单的函数从通道读取数据。
这个想法是,如果客户端关闭连接,ReadFromConngoroutine将在阅读时收到错误,然后返回。defer代码将写入done通道,并关闭done和queue通道。如果done通道有数据,消费者代码将停止处理,并返回当时的任何结果。
另一种情况是消费者可以决定它有足够的数据来做出决定并返回。这将导致执行返回到Handle函数。当该函数返回时,Handle函数上的defer将关闭连接,导致ReadFromConngoroutine在阅读连接时收到错误,并关闭所有挂起的通道。
这工作得很好,但我正在做一些负载测试,并注意到测试完成后Golang服务器的内存使用量并没有像应用程序的所有其他部分那样在负载停止时减少。我拿了一些身份证查了日志。我看到有时(不是所有时候)会显示ERROR阅读FROM CONN消息,但没有READ_FROM_CONN DEFER日志,所以我认为defer从未被调用。因此,ProcessQueue挂起阅读通道,因为通道从未关闭,并且Handle函数的CLOSING...日志也丢失。
至少,这是我认为正在发生的事情,我相信这就是为什么当负载测试结束时内存消耗从未下降的原因,因为代码仍然在运行一些goroutine,从应该由ReadFromConn上的defer代码关闭的通道读取。这种行为是不可预测的;并不是所有的连接都会发生这种情况,所以我不知道可能出了什么问题。
下面是我的Golang服务器的简化版本:
package main
import (
"os"
"net"
"fmt"
"io"
)
type CustomStruct struct {
Type string
Stop bool
}
func main() {
// Creates server
server, err := net.Listen("tcp", "0.0.0.0:80")
if err != nil {
fmt.Println("failed to bind listener to socket", err)
}
defer server.Close()
fmt.Println("Listening new connections V2")
// Starts reading from the server
for {
conn, err := server.Accept()
if err != nil {
fmt.Println("failed to accept new connection:", err)
continue
}
go Handle(conn)
}
}
func Handle(conn net.Conn) {
defer conn.Close()
id := "some uuid for each conn"
// Creates channels
queue := make(chan []byte, 512)
done := make(chan bool)
// Starts reading from the server
go ReadFromConn(id, conn, queue, done)
result := ProcessQueue(id, queue, done)
fmt.Println(id, "CLOSING...")
// Do stuffs with result...
fmt.Println(id, result)
}
func ReadFromConn(
id string,
conn io.Reader,
queue chan []byte,
done chan bool,
) {
defer func() {
done <- true
close(queue)
close(done)
fmt.Println(id, "READ_FROM_CONN DEFER")
}()
tmp := make([]byte, 256)
for {
_, err := conn.Read(tmp)
if err != nil {
fmt.Println(id, "ERROR READING FROM CONN " + err.Error())
return
}
if (tmp[0] == 0x00) {
return
}
queue <- tmp
}
}
func ProcessQueue(
id string,
queue chan []byte,
done chan bool,
) CustomStruct {
defer fmt.Println(id, "GET_TRANSCRIPTION_RESULT ENDED")
fmt.Println(id, "GET_TRANSCRIPTION_RESULT STARTED")
result := CustomStruct{
Type: "transcription",
Stop: false,
}
for {
select {
case <-done:
fmt.Println(id, "DONE DETECTED")
return result
default:
fmt.Println(id, "DEFAULT")
payload, open := <-queue
if open == false {
fmt.Println(id, "QUEUE IS CLOSED")
return result
} else {
fmt.Println(id, "QUEUE IS OPEN")
}
// ... Do stuffs with payload, if certain condition is met, of the result of processing payload, return
if (payload[0] == 0x01) {
return result
}
}
}
return result
}
字符串
1条答案
按热度按时间neskvpey1#
不太清楚问题是什么,但是使用上下文对象可以更干净地控制读取器的寿命。上下文避免了必须密切管理通道对象,并提供了一种干净的方式来报告goroutine中的错误,如果需要的话,使用
context.Cause(ctx)
。示例设置可能如下所示:
字符串
defer调用可能是在上下文cause中放置一个错误,这会导致
ctx.Done
方法现在返回一个具有当前值的通道。这个方法也是可重复的,这意味着当使用简单通道方法时,多个goroutine都可以接收一个值,而不仅仅是一个值。当发送到队列时,您也可以选择
ctx.Done
,以防止永远等待不再被读取的队列。类似地,您也可以从队列中阅读。