Go语言 如何正确使用sync.cond?

rta7y2nd  于 2023-06-27  发布在  Go
关注(0)|答案(9)|浏览(122)

我不知道如何正确使用sync.Cond。据我所知,在锁定Locker和调用条件的Wait方法之间存在争用条件。这个例子在主goroutine中的两行之间添加了一个人工延迟来模拟竞争条件:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[Run on the Go Playground]
这引起了恐慌:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 +0x1a0

我做错了什么?如何避免这种明显的竞态条件?有没有更好的同步结构我应该使用?

    • 编辑:**我意识到我应该更好地解释我在这里试图解决的问题。我有一个长时间运行的goroutine,它下载一个大文件,还有许多其他的goroutine,当HTTP头可用时,它们需要访问HTTP头。这个问题比听起来难。

我不能使用channels,因为只有一个goroutine会收到这个值。而其他一些goroutine会在头文件已经可用很久之后才试图检索它们。
下载器goroutine可以简单地将HTTP头存储在一个变量中,并使用互斥锁来保护对它们的访问。然而,这并没有为其他goroutine提供一种“等待”它们变得可用的方法。
我曾认为sync.Mutexsync.Cond一起可以实现这个目标,但似乎这是不可能的。

ryhaxcpt

ryhaxcpt1#

OP回答了自己的问题,但没有直接回答原问题,我准备贴出如何正确使用sync.Cond
如果每次写和读都有一个goroutine,那么实际上并不需要sync.Cond--一个sync.Mutex就足以在它们之间进行通信。sync.Cond在多个读取器等待共享资源可用的情况下可能很有用。

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Playground
话虽如此,如果情况允许,使用通道仍然是推荐的数据传递方式。
注意:这里的sync.WaitGroup只用于等待goroutine完成它们的执行。

u91tlkcl

u91tlkcl2#

您需要确保在调用c.Wait之后调用c.Broadcast。你的程序的正确版本应该是:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m, waits, then locks m again
    m.Unlock()
}

https://play.golang.org/p/O1r8v8yW6h

uoifb46i

uoifb46i3#

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm

pobjuy32

pobjuy324#

看起来你c.等待广播,这在你的时间间隔中是永远不会发生的。与

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

你的片段似乎工作http://play.golang.org/p/OE8aP4i6gY .或者我错过了你试图实现的东西?

mtb9vblg

mtb9vblg5#

我终于发现了一种方法来做到这一点,它根本不涉及sync.Cond-只是互斥。

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

这是怎么回事?
互斥锁在任务开始时被锁定,确保任何调用WaitFor()的操作都会被阻塞。一旦头可用并且goroutine解锁了互斥锁,每个对WaitFor()的调用将一次执行一个。所有未来的调用(即使在goroutine结束之后)都不会有锁定互斥锁的问题,因为它总是处于解锁状态。

n3ipq98p

n3ipq98p6#

是的,你可以使用一个通道将Header传递给多个Go例程。

headerChan := make(chan http.Header)

go func() { // This routine can be started many times
    header := <-headerChan  // Wait for header
    // Do things with the header
}()

// Feed the header to all waiting go routines
for more := true; more; {
    select {
    case headerChan <- r.Header:
    default: more = false
    }
}
ars1skjm

ars1skjm7#

这可以很容易地用通道完成,代码也很干净。下面是一个例子。希望这有帮助!

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    done := make(chan struct{})
    var wg sync.WaitGroup
    // fork required number of goroutines
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            <-done
            fmt.Println("read the http headers from here")
        }()
    }
    time.Sleep(1) //download your large file here
    fmt.Println("Unblocking goroutines...")
    close(done) // this will unblock all the goroutines
    wg.Wait()
}
p4tfgftt

p4tfgftt8#

在优秀的书“Concurrency in Go”中,他们提供了以下简单的解决方案,同时利用了关闭的通道将释放所有等待的客户端的事实。

package main
import (
    "fmt"
    "time"
)
func main() {
    httpHeaders := []string{}
    headerChan := make(chan interface{})
    var consumerFunc= func(id int, stream <-chan interface{}, funcHeaders *[]string)         
    {
        <-stream
        fmt.Println("Consumer ",id," got headers:", funcHeaders )   
    }
    for i:=0;i<3;i++ {
        go consumerFunc(i, headerChan, &httpHeaders)
    }
    fmt.Println("Getting headers...")
    time.Sleep(2*time.Second)
    httpHeaders=append(httpHeaders, "test1");
    fmt.Println("Publishing headers...")
    close(headerChan )
    time.Sleep(5*time.Second)
}

https://play.golang.org/p/cE3SiKWNRIt

aij0ehis

aij0ehis9#

代码中的问题是,信号只发出一次,而接收go例程还没有准备好,所以信号被错过了。你应该做循环广播。

package main
    
    import (
        "sync"
        "time"
    )
    
    func main() {
        m := sync.Mutex{}
        c := sync.NewCond(&m)
        go func() {
            time.Sleep(1 * time.Second)
            for range time.Tick(time.Millisecond) {
                c.Broadcast()
            }
        }()
        m.Lock()
        time.Sleep(2 * time.Second)
        c.Wait()
        m.Unlock()
    
        //do stuff
    }

相关问题