避免死锁的Go生产者消费者

enxuqcxy  于 2023-01-22  发布在  Go
关注(0)|答案(1)|浏览(178)

我在go中有一个消费者和生产者的代码,虽然我问这个问题是为了代码审查here,而且这个想法的很大一部分是从这个线程here中衍生出来的,这里是playground中的code

  • 此代码有多个生产者和消费者共享同一个通道。
  • 这段代码有一个错误处理机制,如果任何一个工作者(生产者或消费者)出错,那么所有的工作者都应该停止。

我担心死锁情况,即所有消费者都关闭,但生产者仍在向共享通道添加数据。为了“缓解”这种情况,我在向数据队列添加数据之前添加了上下文检查-具体来说就是go playground中的第85行。
然而,如果--第85行的生产者检查context.Done(),然后上下文被取消,导致所有消费者关闭,然后生产者试图插入数据到队列中,那么死锁仍然可能吗?
如果是,如何缓解。
重新发布代码:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}
jogvjijk

jogvjijk1#

关于死锁:在你描述的场景中,这似乎是不可能的。如果生产者能够发送到通道,这意味着有一个消费者可以接收,这将原子地发生,所以生产者不可能决定发送到通道,但消费者停止。生产者发送到通道只有当消费者可以消费。
但是,当然还有可能的改进。既然你只对第一个错误感兴趣,你可以简单地做:

func main() {
  ...
  var firstErr error
  go func() {
      for err := range errCh {
         if firstErr!=nil {
           firstErr=err
         }
      }
  }()
  ...
  close(errCh)
  ...

相关问题