多个goroutine从同一个通道阅读

wztqucjr  于 2023-04-03  发布在  Go
关注(0)|答案(1)|浏览(163)

查看生成多个goroutine来从同一个通道读取值。两个worker按预期生成,但只从通道读取一个项目并停止阅读。我希望goroutine继续从通道读取,直到它从发送值到通道的goroutine关闭为止。生成项目的goroutine没有关闭,尽管有东西阻止发送者发送。为什么每个worker只读取一个值并停止?
输出显示发送了两个值,每个工作goroutine阅读一个值。第三个值被发送,但没有从任何一个工作goroutine读取。

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

Go Playground

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}
jckbn6z7

jckbn6z71#

前面关于通道未缓冲的评论是正确的,但也存在其他同步问题。
无缓冲通道本质上意味着在写入值时,必须在任何其他写入发生之前接收该值。

  1. workerPool创建了一个无缓冲的通道out来存储结果,但是只有在所有结果都被写入out之后才返回。但是由于从out通道阅读是在workerPool返回之后发生的,并且out是无缓冲的,所以workerPool在尝试写入时会被阻塞,导致死锁。这就是为什么看起来每个worker只发送一个值的原因;实际上,在发送第一个值之后,所有的worker都会被阻塞,因为没有任何东西可以接收这个值(您可以通过在写入out之后移动print语句来看到这一点)。
    修复的选项包括使out具有大小为n = number of results(即out := make(chan int, n))的缓冲区,或者使out未缓冲,并在写入时从out进行阅读。
  2. done通道也没有被正确使用。mainworkerPool都依赖它来停止执行,但是没有任何东西被写入它!它也是无缓冲的,所以它也会遇到上面描述的死锁问题。
    要解决这个问题,首先可以从workerPool中删除case <-done:,并简单地遍历in,因为它在main中是关闭的。然后可以将done作为缓冲通道来解决死锁。
    结合这些修复程序,给予:
package main

import (
    "fmt"
    "sync"
)

func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int, 100)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    done <- true
    close(done)
    return out
}

func main() {
    done := make(chan bool, 1)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        }
    }

}

这解决了你的问题,但这不是使用通道的最佳方式!结构本身可以改变得更简单,不必依赖于缓冲通道。

相关问题