查看生成多个goroutine来从同一个通道读取值。两个worker按预期生成,但只从通道读取一个项目并停止阅读。我希望goroutine继续从通道读取,直到它从发送值到通道的goroutine关闭为止。生成项目的goroutine没有关闭,尽管有东西阻止发送者发送。为什么每个worker只读取一个值并停止?
输出显示发送了两个值,每个工作goroutine阅读一个值。第三个值被发送,但没有从任何一个工作goroutine读取。
new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0
package main
import (
"fmt"
"sync"
)
func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
out := make(chan int)
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
fmt.Println("new worker")
wg.Add(1)
// fan out worker goroutines reading from in channel and
// send output into out channel
go func() {
defer wg.Done()
for {
select {
case <-done:
fmt.Println("recieved done signal")
return
case data, ok := <-in:
if !ok {
fmt.Println("no more items")
return
}
// fan-in job execution multiplexing results into the results channel
fmt.Println("running func", data)
value := fn(data)
fmt.Println("sending value out", value)
out <- value
}
}
}()
}
fmt.Println("waiting")
wg.Wait()
fmt.Println("done waiting")
close(out)
return out
}
func main() {
done := make(chan bool)
defer close(done)
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("sending", i)
in <- i
}
close(in)
}()
out := workerPool(done, in, 2, func(i int) int {
return i
})
for {
select {
case o, ok := <-out:
if !ok {
continue
}
fmt.Println("output", o)
case <-done:
return
default:
}
}
}
1条答案
按热度按时间jckbn6z71#
前面关于通道未缓冲的评论是正确的,但也存在其他同步问题。
无缓冲通道本质上意味着在写入值时,必须在任何其他写入发生之前接收该值。
workerPool
创建了一个无缓冲的通道out
来存储结果,但是只有在所有结果都被写入out之后才返回。但是由于从out通道阅读是在workerPool
返回之后发生的,并且out
是无缓冲的,所以workerPool
在尝试写入时会被阻塞,导致死锁。这就是为什么看起来每个worker只发送一个值的原因;实际上,在发送第一个值之后,所有的worker都会被阻塞,因为没有任何东西可以接收这个值(您可以通过在写入out
之后移动print语句来看到这一点)。修复的选项包括使
out
具有大小为n = number of results
(即out := make(chan int, n)
)的缓冲区,或者使out
未缓冲,并在写入时从out
进行阅读。done
通道也没有被正确使用。main
和workerPool
都依赖它来停止执行,但是没有任何东西被写入它!它也是无缓冲的,所以它也会遇到上面描述的死锁问题。要解决这个问题,首先可以从
workerPool
中删除case <-done:
,并简单地遍历in
,因为它在main
中是关闭的。然后可以将done
作为缓冲通道来解决死锁。结合这些修复程序,给予:
这解决了你的问题,但这不是使用通道的最佳方式!结构本身可以改变得更简单,不必依赖于缓冲通道。