Go语言 正在等待同步,超时的条件

bq8i3lrv  于 2024-01-04  发布在  Go
关注(0)|答案(6)|浏览(151)

有没有可能以某种简单的方式来实现Java的

wait(long timeMillis)

字符串
它会在一个监视器上等待一段指定的时间(大概是mutex+cond),如果没有收到信号就返回?
我在文档或谷歌上找不到任何关于这一点的东西,虽然当然可以玩一些游戏,制作一个WaitGroup并拥有一个定时器goroutine pop,但仅仅获得这个简单的功能似乎很乏味/烦人/效率低下(顺便说一下,我遇到的任何底层系统线程库都直接支持它)
编辑:是的,我们都读过http://www.golang-book.com/10/index.htmhttps://blog.golang.org/pipelines-再次,创建更多的线程是一个“坏”(非性能)解决方案,和通道也不太适合这一点。想象一下一个典型的并发服务器Join()方法的用例。(请不要告诉我反转控件并使用一个API模式来代替。你并不总是有奢侈来改变你正在使用的API.)

6yoyoihd

6yoyoihd1#

你可以实现一个条件变量,它只支持广播(无信号),有一个频道。https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go
你也可以在代码中使用这种替换通道并关闭旧通道的技术。Gist中的代码使用unsafe.Pointer和atomic操作来允许调用'Broadcast'而不获取主sync. Locker。然而在你自己的代码中,通常情况下,你应该从获取锁中广播,这样你就不需要做任何不安全/原子的事情。
当这个方法工作时,你可能还想 checkout :https://godoc.org/golang.org/x/sync/semaphore。如果你做了一个限制为1的加权信号量,那也会给你给予你需要的所有能力,这也是公平的。

r6vfmomb

r6vfmomb2#

不。没有简单的方法来做到这一点,并根据该线程,他们不会添加一个。(虽然也许与他们讨论它可能会让你在某处)
但总有一条艰难的路。两个选择:
1.运行您自己的具有此功能的Cond。(请参见https://golang.org/src/sync/cond.go
1.通过系统调用使用操作系统级别的功能。(可能是futex?)
这里的挑战--也是为什么它不是微不足道的原因--是goroutine不是线程。Go有它自己的自定义调度程序。创建自己的Cond将涉及到修补运行时的部分,而这些部分并不真正意味着要修补。(但是,就像我说的,这是可能的)
如果这是限制的话,很抱歉。大多数go都很简单--你经常可以毫不费力地跳到下层。但是调度器不是这样的。它很神奇。
这个魔法适用于大多数东西,他们在sync中添加了一些东西来覆盖一些已知的情况。如果你觉得你找到了另一个,也许你可以说服他们添加它。(但这不仅仅是从另一种编程语言复制一个API,或者公开一个底层API的问题)

daolsyd0

daolsyd03#

从Go 1.21开始,有一种方法可以用新的context.AfterFunc来实现。甚至有这样的例子:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    waitOnCond := func(ctx context.Context, cond *sync.Cond, conditionMet func() bool) error {
        stopf := context.AfterFunc(ctx, func() {
            // We need to acquire cond.L here to be sure that the Broadcast
            // below won't occur before the call to Wait, which would result
            // in a missed signal (and deadlock).
            cond.L.Lock()
            defer cond.L.Unlock()

            // If multiple goroutines are waiting on cond simultaneously,
            // we need to make sure we wake up exactly this one.
            // That means that we need to Broadcast to all of the goroutines,
            // which will wake them all up.
            //
            // If there are N concurrent calls to waitOnCond, each of the goroutines
            // will spuriously wake up O(N) other goroutines that aren't ready yet,
            // so this will cause the overall CPU cost to be O(N²).
            cond.Broadcast()
        })
        defer stopf()

        // Since the wakeups are using Broadcast instead of Signal, this call to
        // Wait may unblock due to some other goroutine's context becoming done,
        // so to be sure that ctx is actually done we need to check it in a loop.
        for !conditionMet() {
            cond.Wait()
            if ctx.Err() != nil {
                return ctx.Err()
            }
        }

        return nil
    }

    cond := sync.NewCond(new(sync.Mutex))

    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
            defer cancel()

            cond.L.Lock()
            defer cond.L.Unlock()

            err := waitOnCond(ctx, cond, func() bool { return false })
            fmt.Println(err)
        }()
    }
    wg.Wait()
}

字符串

2wnc66cl

2wnc66cl4#

我在今年的GopherCon演讲中概述了几种可能的替代方案(参见https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view)。“条件变量”部分从第37张幻灯片开始,但这种特定模式在备份幻灯片(101-105)中有更详细的介绍。
正如zviadm所指出的,一种选择(https://play.golang.org/p/tWVvXOs87HX)是关闭一个通道。
另一个选项(https://play.golang.org/p/uRwV_i0v13T)是让每个服务员分配一个1缓冲的频道,并让广播公司向缓冲区发送一个令牌进行广播。
如果该事件是一个持续的条件,如“队列为空”,第三个选项(https://play.golang.org/p/uvx8vFSQ2f0)是使用1缓冲通道,并让每个接收器重新填充缓冲区,只要条件持续。

w51jfk4q

w51jfk4q5#

https://gitlab.com/jonas.jasas/condchan允许等待超时。请看一个例子:

package main

import (
    "fmt"
    "sync"
    "time"
    "gitlab.com/jonas.jasas/condchan"
)

func main() {
    cc := condchan.New(&sync.Mutex{})
    timeoutChan := time.After(time.Second)

    cc.L.Lock()
    // Passing func that gets channel c that signals when
    // Signal or Broadcast is called on CondChan
    cc.Select(func(c <-chan struct{}) { // Waiting with select
        select {
        case <-c: // Never ending wait
        case <-timeoutChan:
            fmt.Println("Hooray! Just escaped from eternal wait.")
        }
    })
    cc.L.Unlock()
}

字符串

wswtfjt7

wswtfjt76#

我遇到了同样的问题,事实证明,使用一个通道很容易解决。

  • 信号是在信道上发送的信号
  • 等待只是在通道上等待消息。
  • 带有超时的等待只是对计时器和消息的选择。
  • 广播是一个循环发送消息,直到没有人留下来听。

与任何条件变量一样,当您等待时,它需要持有互斥锁,并且强烈建议在您发出信号时持有它。
我写了一个遵循Cond协议的in实现,并添加了一个WaitOrbit。如果成功,它将返回true,如果超时,则返回false。
这是我的代码沿着一些测试用例!免责声明:这看起来工作正常,但还没有经过彻底的测试。而且,不保证公平性。等待的线程按照调度程序认为合适的顺序释放,不一定是先到先服务。
https://play.golang.org/p/K1veAOGbWZ

package main

import (
    "sync"
    "time"
    "fmt"
)

type TMOCond struct {
    L    sync.Locker
    ch      chan bool
}

func NewTMOCond(l sync.Locker) *TMOCond {
    return &TMOCond{ ch: make(chan bool), L: l }
}

func (t *TMOCond) Wait() {
    t.L.Unlock()
    <-t.ch
    t.L.Lock()
}

func (t *TMOCond) WaitOrTimeout(d time.Duration) bool {
    tmo := time.NewTimer(d)
    t.L.Unlock()
    var r bool
    select {
    case <-tmo.C:
    r = false
    case <-t.ch:
        r = true
    }
    if !tmo.Stop() {
        select {
        case <- tmo.C:
        default:
        }
    }
    t.L.Lock()
    return r
}

func (t *TMOCond) Signal() {
    t.signal()
}

func (t *TMOCond) Broadcast() {
    for {
        // Stop when we run out of waiters
        //
        if !t.signal() {
            return
        }
    }
}

func (t *TMOCond) signal() bool {
    select {
    case t.ch <- true:
        return true
    default:
        return false
    }
}

// **** TEST CASES ****
func lockAndSignal(t *TMOCond) {
    t.L.Lock()
    t.Signal()
    t.L.Unlock()
}

func waitAndPrint(t *TMOCond, i int) {
    t.L.Lock()
    fmt.Println("Goroutine", i, "waiting...")
    ok := t.WaitOrTimeout(10 * time.Second)
    t.L.Unlock()
    fmt.Println("This is goroutine", i, "ok:", ok)
}

func main() {
    var m sync.Mutex
    t := NewTMOCond(&m)

    // Simple wait
    //
    t.L.Lock()
    go lockAndSignal(t)
    t.Wait()
    t.L.Unlock()
    fmt.Println("Simple wait finished.")

    // Wait that times out
    //
    t.L.Lock()
    ok := t.WaitOrTimeout(100 * time.Millisecond)
    t.L.Unlock()
    fmt.Println("Timeout wait finished. Timeout:", !ok)

    // Broadcast. All threads should finish.
    //
    for i := 0; i < 10; i++ {
        go waitAndPrint(t, i)
    }
    time.Sleep(1 * time.Second) 
    t.L.Lock()
    fmt.Println("About to signal")
    t.Broadcast()
    t.L.Unlock()
    time.Sleep(10 * time.Second)
}

字符串

相关问题