Go语言 sync.WaitGroup相对于Channels的优势是什么?

v8wbuo2f  于 2023-06-27  发布在  Go
关注(0)|答案(7)|浏览(124)

我正在开发一个并发Go库,我偶然发现了两种不同的goroutine同步模式,它们的结果是相似的:
Waitgroup

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    words := []string{"foo", "bar", "baz"}

    for _, word := range words {
        wg.Add(1)
        go func(word string) {
            time.Sleep(1 * time.Second)
            defer wg.Done()
            fmt.Println(word)
        }(word)
    }
    // do concurrent things here

    // blocks/waits for waitgroup
    wg.Wait()
}

Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    words := []string{"foo", "bar", "baz"}
    done := make(chan bool)
    // defer close(done)
    for _, word := range words {
        // fmt.Println(len(done), cap(done))
        go func(word string) {
            time.Sleep(1 * time.Second)
            fmt.Println(word)
            done <- true
        }(word)
    }
    // Do concurrent things here

    // This blocks and waits for signal from channel
    for range words {
        <-done
    }
}

有人建议sync.WaitGroup的性能稍高一些,我也看到它被广泛使用。但是,我发现通道更惯用。使用sync.WaitGroup在通道上的真实的优势是什么?

vtwuwzda

vtwuwzda1#

与第二个例子的正确性无关(正如评论中所解释的,你没有做你想做的事情,但它很容易修复),我倾向于认为第一个例子更容易理解。
现在,我甚至不会说通道更惯用。通道是Go语言的一个标志性特性,但这并不意味着只要有可能就可以使用它们。在Go语言中,惯用的是使用最简单和最容易理解的解决方案:在这里,WaitGroup既传达了含义(您的主要功能是Wait ing for worker to be done),也传达了机制(worker在Done时通知)。
除非你在一个非常特殊的情况下,我不建议在这里使用通道解决方案。

az31mfrm

az31mfrm2#

对于您的简单示例(表示作业完成),WaitGroup是显而易见的选择。Go编译器非常友好,不会因为你使用通道来简单地发送完成任务的信号而责怪你,但有些代码审查者会这样做。
1.“一个WaitGroup等待一个goroutine集合完成。主goroutine调用Add(n)来设置要等待的goroutine的数量。然后每个goroutine运行并在完成时调用Done()。同时,Wait可以用来阻止所有goroutine完成。”

words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
    wg.Add(1)
    go func(word string) {
        defer wg.Done()
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
    }(word)
}
wg.Wait()

这些可能性只受你的想象力的限制:
1.通道可以缓冲

words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
    go func(word string) {
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
        done <- struct{}{} // not blocking
    }(word)
}
for range words {
    <-done
}

1.信道可以是无缓冲的,您可以只使用信令信道(例如chan struct{}):

words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
    go func(word string) {
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
        done <- struct{}{} // blocking
    }(word)
}
for range words {
    <-done
}

1.您可以限制缓冲通道容量的并发作业数:

t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
    wg.Add(1)
    go func(word string) {
        done <- struct{}{}
        time.Sleep(100 * time.Millisecond) // job
        fmt.Println(word, time.Since(t0))
        <-done
        wg.Done()
    }(word)
}
wg.Wait()

1.您可以使用以下渠道发送消息:

done := make(chan string)
go func() {
    for _, word := range []string{"foo", "bar", "baz"} {
        done <- word
    }
    close(done)
}()
for word := range done {
    fmt.Println(word)
}

基准:

go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8  1827517   652 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1000000  2373 ns/op  520 B/op  1 allocs/op
    go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8  1770260   678 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1560124  1249 ns/op  158 B/op  0 allocs/op

代码(main_test.go):

package main

import (
    "flag"
    "fmt"
    "os"
    "sync"
    "testing"
)

func BenchmarkEvenWaitgroup(b *testing.B) {
    evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
    evenChannel(b.N)
}
func evenWaitgroup(n int) {
    if n%2 == 1 { // make it even:
        n++
    }
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(n int) {
            select {
            case ch <- n: // tx if channel is empty
            case i := <-ch: // rx if channel is not empty
                // fmt.Println(n, i)
                _ = i
            }
            wg.Done()
        }(i)
    }
    wg.Wait()
}
func evenChannel(n int) {
    if n%2 == 1 { // make it even:
        n++
    }
    for i := 0; i < n; i++ {
        go func(n int) {
            select {
            case ch <- n: // tx if channel is empty
            case i := <-ch: // rx if channel is not empty
                // fmt.Println(n, i)
                _ = i
            }
            done <- struct{}{}
        }(i)
    }
    for i := 0; i < n; i++ {
        <-done
    }
}
func TestMain(m *testing.M) {
    var n int // We use TestMain to set up the done channel.
    flag.IntVar(&n, "n", 1_000_000, "chan cap")
    flag.Parse()
    done = make(chan struct{}, n)
    fmt.Println("n=", n)
    os.Exit(m.Run())
}

var (
    done chan struct{}
    ch   = make(chan int)
    wg   sync.WaitGroup
)
4si2a6ki

4si2a6ki3#

这取决于用例。如果您要分派并行运行的一次性作业,而不需要知道每个作业的结果,那么可以使用WaitGroup。但是如果你需要从goroutine中收集结果,那么你应该使用通道。
因为一个频道是双向的,所以我几乎总是使用一个频道。
另一方面,正如注解中指出的,您的通道示例没有正确实现。您需要一个单独的通道来指示没有更多的作业要做(一个示例是here)。在您的例子中,由于您事先知道单词的数量,因此可以只使用一个缓冲通道并接收固定的次数,以避免声明关闭通道。

2ledvvac

2ledvvac4#

如果你特别坚持只使用通道,那么它需要以不同的方式完成(如果我们使用你的例子,正如@Not_a_Golfer指出的那样,它会产生不正确的结果)。
一种方法是创建一个int类型的通道。在worker进程中,每次它完成作业时发送一个数字(这也可以是唯一的作业id,如果你愿意,你可以在接收器中跟踪它)。
在接收器主go例程中(它将知道提交的作业的确切数量)-在通道上进行范围循环,计数直到提交的作业数量未完成,并在所有作业完成时中断循环。如果您想跟踪每个作业的完成情况(如果需要的话,也许可以做一些事情),这是一个很好的方法。
下面是代码供您参考。递减totalJobsLeft是安全的,因为它只会在通道的范围循环中完成!

//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {

    comChannel := make(chan int)
    words := []string{"foo", "bar", "baz"}

    totalJobsLeft := len(words)

    //We know how many jobs are being sent

    for j, word := range words {
        jobId := j + 1
        go func(word string, jobId int) {

            fmt.Println("Job ID:", jobId, "Word:", word)
            //Do some work here, maybe call functions that you need
            //For emulating this - Sleep for a random time upto 5 seconds
            randInt := rand.Intn(5)
            //fmt.Println("Got random number", randInt)
            time.Sleep(time.Duration(randInt) * time.Second)
            comChannel <- jobId
        }(word, jobId)
    }

    for j := range comChannel {
        fmt.Println("Got job ID", j)
        totalJobsLeft--
        fmt.Println("Total jobs left", totalJobsLeft)
        if totalJobsLeft == 0 {
            break
        }
    }
    fmt.Println("Closing communication channel. All jobs completed!")
    close(comChannel)

}
i86rm4rw

i86rm4rw5#

我经常使用通道来收集可能产生错误的goroutine的错误消息。下面是一个简单的例子:

func couldGoWrong() (err error) {
    errorChannel := make(chan error, 3)

    // start a go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 0; c < 10; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // start another go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 10; c < 100; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // start yet another go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 100; c < 1000; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // synchronize go routines and collect errors here
    for c := 0; c < cap(errorChannel); c++ {
        err = <-errorChannel
        if err != nil {
            return
        }
    }

    return
}
f87krz0w

f87krz0w6#

这里已经有很好的答案,通道并不总是惯用的。例如,当实现工作池时,使用等待组更清楚。
还注意到您的通道实现不正确,因为它在第一个条目后退出,而不是最后一个条目。
我决定修复它:

package main

import (
    "fmt"
    "time"
)

func main() {
    words := []string{"foo", "bar", "baz", "fax", "bor", "far"}
    workersCount := len(words)
    workersChan := make(chan bool, workersCount)

    for _, word := range words {
        go func(word string) {
            time.Sleep(1 * time.Second)
            fmt.Println(word)
            workersChan <- true
        }(word)
    }

    for i := 0; i != workersCount; i++ {
        <-workersChan
    }
}
lx0bsm1f

lx0bsm1f7#

还建议使用waitgroup,但你仍然想用channel来做,那么下面我提到一个简单的使用channel的方法

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan string)
    words := []string{"foo", "bar", "baz"}

    go printWordrs(words, c)

    for j := range c {
        fmt.Println(j)
    }
}

func printWordrs(words []string, c chan string) {
    defer close(c)
    for _, word := range words {
        time.Sleep(1 * time.Second)
        c <- word
    }   
}

相关问题