我在go中有一个消费者和生产者的代码,虽然我问这个问题是为了代码审查here,而且这个想法的很大一部分是从这个线程here中衍生出来的,这里是playground中的code。
- 此代码有多个生产者和消费者共享同一个通道。
- 这段代码有一个错误处理机制,如果任何一个工作者(生产者或消费者)出错,那么所有的工作者都应该停止。
我担心死锁情况,即所有消费者都关闭,但生产者仍在向共享通道添加数据。为了“缓解”这种情况,我在向数据队列添加数据之前添加了上下文检查-具体来说就是go playground中的第85行。
然而,如果--第85行的生产者检查context.Done(),然后上下文被取消,导致所有消费者关闭,然后生产者试图插入数据到队列中,那么死锁仍然可能吗?
如果是,如何缓解。
重新发布代码:
package main
import (
"context"
"fmt"
"sync"
)
func main() {
a1 := []int{1, 2, 3, 4, 5}
a2 := []int{5, 4, 3, 1, 1}
a3 := []int{6, 7, 8, 9}
a4 := []int{1, 2, 3, 4, 5}
a5 := []int{5, 4, 3, 1, 1}
a6 := []int{6, 7, 18, 9}
arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}
ctx, cancel := context.WithCancel(context.Background())
ch1 := read(ctx, arrayOfArray)
messageCh := make(chan int)
errCh := make(chan error)
producerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
producerWg.Add(1)
producer(ctx, producerWg, ch1, messageCh, errCh)
}
consumerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
consumerWg.Add(1)
consumer(ctx, consumerWg, messageCh, errCh)
}
firstError := handleAllErrors(ctx, cancel, errCh)
producerWg.Wait()
close(messageCh)
consumerWg.Wait()
close(errCh)
fmt.Println(<-firstError)
}
func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
ch := make(chan []int)
go func() {
defer close(ch)
for i := 0; i < len(arrayOfArray); i++ {
select {
case <-ctx.Done():
return
case ch <- arrayOfArray[i]:
}
}
}()
return ch
}
func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case arr, ok := <-in:
if !ok {
return
}
for i := 0; i < len(arr); i++ {
// simulating an error.
//if arr[i] == 10 {
// errCh <- fmt.Errorf("producer interrupted")
//}
select {
case <-ctx.Done():
return
case messageCh <- 2 * arr[i]:
}
}
}
}
}()
}
func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
go func() {
wg.Done()
for {
select {
case <-ctx.Done():
return
case n, ok := <-messageCh:
if !ok {
return
}
fmt.Println("consumed: ", n)
// simulating erros
//if n == 10 {
// errCh <- fmt.Errorf("output error during write")
//}
}
}
}()
}
func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
firstErrCh := make(chan error, 1)
isFirstError := true
go func() {
defer close(firstErrCh)
for err := range errCh {
select {
case <-ctx.Done():
default:
cancel()
}
if isFirstError {
firstErrCh <- err
isFirstError = !isFirstError
}
}
}()
return firstErrCh
}
1条答案
按热度按时间jogvjijk1#
关于死锁:在你描述的场景中,这似乎是不可能的。如果生产者能够发送到通道,这意味着有一个消费者可以接收,这将原子地发生,所以生产者不可能决定发送到通道,但消费者停止。生产者发送到通道只有当消费者可以消费。
但是,当然还有可能的改进。既然你只对第一个错误感兴趣,你可以简单地做: