从goroutine通道阅读而不阻塞

o75abkj4  于 2023-04-09  发布在  Go
关注(0)|答案(4)|浏览(124)

我有两个goroutine:主worker和一个helper,它会为一些帮助而旋转。helper可能会遇到错误,所以我使用一个通道将错误从helper传递到worker

func helper(c chan <- error) (){
    //do some work
    c <- err // send errors/nil on c
}

下面是helper()的调用方式:

func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err := <- c
    return err
}

问题:

  • 语句err := <- c阻塞了worker吗?我不这么认为,因为通道是缓冲的。
  • 如果它是阻塞的,我如何使它成为非阻塞的?我的要求是让worker和它的调用者继续剩下的工作,而不需要等待通道上出现值。

谢谢。

xtfmy6hx

xtfmy6hx1#

您可以轻松验证

func helper(c chan<- error) {
    time.Sleep(5 * time.Second)
    c <- errors.New("") // send errors/nil on c
}

func worker() error {
    fmt.Println("do one")

    c := make(chan error, 1)
    go helper(c)

    err := <-c
    fmt.Println("do two")

    return err
}

func main() {
    worker()
}

问:语句err:=〈- c会阻塞worker吗?我不这么认为,因为通道是缓冲的。

A:err := <- c会阻塞worker。

问:如果它是阻塞的,我如何使它成为非阻塞的?我的要求是让worker和它的调用者继续剩下的工作,而不等待值出现在通道上。

**A:**如果不想阻塞,就把err := <-c去掉,如果最后需要err,就把err := <-c移到最后。

你不能不阻塞地读通道,如果你不阻塞地读通道,就不能再执行这段代码,除非你的代码在循环中。

Loop:
    for {
        select {
        case <-c:
            break Loop
        default:
            //default will go through without blocking
        }
        // do something
    }

你见过errgroup或waitgroup吗?
它使用原子,取消上下文和同步。一次实现这一点。
https://github.com/golang/sync/blob/master/errgroup/errgroup.go
https://github.com/golang/go/blob/master/src/sync/waitgroup.go
或者你可以直接使用它,运行func,然后在任何你想要的地方等待错误。

ac1kyiln

ac1kyiln2#

在你的代码中,剩下的工作与helper是否遇到错误无关。你可以在剩下的工作完成后简单地从通道接收。

func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    //do rest of the work
    return <-c
}
kkbh8khc

kkbh8khc3#

以非阻塞方式从通道阅读的功能方式:

func CollectChanOne[T any](ch <-chan T) (T, bool) {
    select {
    case val, stillOpen := <-ch:
        return val, stillOpen
    default:
        var zeroT T
        return zeroT, false
    }
}


func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err, _ := CollectChanOne(c) // Wont block worker
    return err
}

示例:https://go.dev/play/p/Njwyt32B4oT
注意:这个例子还有另一个方法CollectChanRemaining(),它读取通道中的所有缓冲元素。

smdncfj3

smdncfj34#

我想你需要这个密码。
运行此代码

package main

import (
    "log"
    "sync"
)

func helper(c chan<- error) {

    for {
        var err error = nil
        // do job

        if err != nil {
            c <- err // send errors/nil on c
            break
        }
    }

}

func worker(c chan error) error {
    log.Println("first log")

    go func() {
        helper(c)
    }()

    count := 1
    Loop:
        for {
            select {
            case err := <- c :
                return err
            default:
                log.Println(count, " log")
                count++
                isFinished := false
                // do your job
                if isFinished {
                    break Loop // remove this when you test

                }
            }
        }
    return nil
}

func main() {
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        c := make(chan error, 1)
        worker(c)
        wg.Done()
    }()
    wg.Wait()
}

相关问题