Go语言 工作池使用通道停止时的恐慌

yquaqz18  于 2024-01-04  发布在  Go

我正在编写一个工作池实现,在停止Worker时遇到了一个问题。在运行for _, worker := range p.workers {...}循环后,我会出现恐慌。在添加debug后,我开始出现恐慌,为什么这是我不明白的。在workersShutdown忽略循环之前,我把fmt.Print放在调试中,但代码不工作,我设置defer func()与恢复和什么都没有。添加调试,并能够得到恐慌??我不能弄清楚如何和为什么。当我在测试中调用Stop方法终止Pool我得到了恐慌,下面是运行测试后的日志。

{"level":"info","service":"workerPool","workerPoolName":"test-pool","service":"Worker","method":"Start","message":"worker finished processing the task"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"Stop","message":"stop worker pool with all worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"shutdown","message":"shutdown worker pool"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop all workers"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop each worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"worker count all: 8"}
{"level":"error","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","error":"runtime error: invalid memory address or nil pointer dereference","message":"workersShutdown panic"}


// Option represents an option that can be passed when instantiating the work pool
type Option func(pool *Pool)

// CreateContext initializes the context and contextCancelFunc function for the pool.
func CreateContext(ctx context.Context) Option {
    return func(pool *Pool) {
        pool.ctx, pool.contextCancelFunc = context.WithCancel(ctx)

// Pool worker controller
type Pool struct {
    // Parent context for the pool.
    parentCtx context.Context
    // Pool's context.
    ctx context.Context
    // Cancel function for the pool's context.
    contextCancelFunc context.CancelFunc
    // Name of the worker pool. More convenient for logging.
    name string
    // Channel to signal stopping the pool.
    stopCh chan struct{}
    // Channel for queueing jobs.
    collector chan *Job
    // Slice to hold worker instances.
    workers []*Worker
    // Maximum number of workers in the pool.
    maxWorkersCount int
    // Number of currently running workers.
    workerConcurrency int32
    // Timeout for individual workers.
    workerTimeOut time.Duration
    // Mutex for locking access to the pool.
    mutex sync.Mutex
    // Used for a one-time action (starting the pool).
    onceStart sync.Once
    // Used for a one-time action (stopping the pool).
    onceStop sync.Once
    // Wait group for tracking running workers.
    wg sync.WaitGroup
    // Logger for the pool.
    logger *zerolog.Logger

    stopped bool

// NewPool creates a new worker pool.
func NewPool(parentCtx context.Context, queue chan *Job, workerConcurrency int, name string, logger *zerolog.Logger, workerTimeOut time.Duration) *Pool {
    l := logger.With().Str("service", "workerPool").Str("workerPoolName", name).Logger()

    if workerConcurrency == 0 {
        workerConcurrency = runtime.NumCPU() * 2

    pool := &Pool{
        parentCtx:       parentCtx,
        name:            name,
        stopCh:          make(chan struct{}, 1),
        collector:       queue,
        workers:         make([]*Worker, workerConcurrency),
        maxWorkersCount: workerConcurrency,
        workerTimeOut:   workerTimeOut,
        logger:          &l,
        stopped:         false,


    return pool

// Run starts the worker pool and worker goroutines. It creates and launches a specified number of worker goroutines,
// each of which is responsible for processing jobs from a shared collector. This method also continuously listens for stop signals
// or context cancellation and reacts accordingly, ensuring a clean and controlled shutdown of the worker pool.
func (p *Pool) Run() {
    p.onceStart.Do(func() {
        // Add one to the wait group for the main pool loop.
        // Start the main pool loop in a goroutine.
        go p.loop()

func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
    l := p.logger.With().Str("method", "AddWorker").Logger()

    defer func() {
        // Recover from any panic in the job and report it.
        if rec := recover(); rec != nil {
            err = common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("add worker panic, pool name: %s", p.name)

    if !p.incrementWorkerCount() {
        return fmt.Errorf("can't create more worker")

    defer p.mutex.Unlock()
    worker := NewWorker(p.ctx, p, collector, workerID, workerTimeout, p.logger)
    // Add the worker to the workers slice.
    p.workers = append(p.workers, worker)
    // Add one to the wait group to track the new worker.
    go worker.Start(&p.wg)

    return nil

// loop is the main worker pool loop. It creates and launches worker goroutines, listens for stop signals,
// and ensures a clean and controlled shutdown of the worker pool.
func (p *Pool) loop() {
    l := p.logger.With().Str("method", "loop").Logger()

    defer func() {
        // Recover from any panic in the job and report it.
        if rec := recover(); rec != nil {
            err := common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("pool catch panic, pool name: %s", p.name)

        // Cancel the pool's context.

    for {
        select {
        case _, ok := <-p.stopCh:
            if !ok {
                l.Error().Msg("stop channel is close")
            // This option suits us as it guarantees that the channel was closed when the Stop method was called,
            // which means that the pool should stop working.
            l.Info().Msg("stop pool")

        case <-p.parentCtx.Done():
            // Log that the pool's context is closing.
            l.Info().Msg("parent context is close")
            // Acquire a read lock on the pool to prevent other modifications.
            // Trigger the pool shutdown sequence.
            // Release the read lock.

        case <-p.ctx.Done():
            l.Info().Msg("context is close") // Log that the pool's context is closing.

// RunningWorkers returns the number of running worker goroutines.
func (p *Pool) RunningWorkers() int {
    return int(atomic.LoadInt32(&p.workerConcurrency))

// incrementWorkerCount attempts to increment the worker count if it's below the maximum limit.
// It protects the worker count and associated wait group using a mutex to ensure
// thread-safety while managing the pool's worker count.
// If the current worker count is less than the maximum allowed workers, it increments
// the worker count and adds one to the wait group, signifying that a new worker is being started.
// If the maximum worker limit has been reached, it returns false to indicate that no more workers can be started.
// NOTE: Perhaps in the next iteration we will add simple workers to not create a large number of workers,
// but to make them flexible, to have a minimum number of workers,
// and a maximum number of workers to be able to automatically manage the pool of workers,
// without downtime or oversupply of workers.
func (p *Pool) incrementWorkerCount() bool {
    // Lock the mutex to protect the worker count and wait group.
    defer p.mutex.Unlock()
    // Get the current count of running workers.
    counter := p.RunningWorkers()

    if counter >= p.maxWorkersCount {
        // The maximum worker limit has been reached, no more workers can be started.
        return false
    // Increment the worker counter.
    atomic.AddInt32(&p.workerConcurrency, 1)

    return true

func (p *Pool) decrementWorkerCount() {
    p.logger.Info().Msg("decrement worker count")
    atomic.AddInt32(&p.workerConcurrency, -1)

// Stop stops the worker pool.
func (p *Pool) Stop() {
    l := p.logger.With().Str("method", "Stop").Logger()
    l.Debug().Msg("stop worker pool with all worker")

    defer func() {
        // Wait for all workers to finish.

    // Cancel the pool's context.
    // Signal for stopping the pool.
    p.stopCh <- struct{}{}
    // Close the stop channel.
    // Trigger the pool shutdown sequence.

// shutdown stops the worker pool.
func (p *Pool) shutdown() {
    l := p.logger.With().Str("method", "shutdown").Logger()
    l.Debug().Msg("shutdown worker pool")
    // Stop all workers.

// workersShutdown stops all worker goroutines.
func (p *Pool) workersShutdown() <-chan struct{} {
    l := p.logger.With().Str("method", "workersShutdown").Logger()
    l.Debug().Msg("stop all workers")

    doneCh := make(chan struct{})

    defer func() {
        if r := recover(); r != nil {
            l.Error().Err(common.GetRecoverError(r)).Msg("workersShutdown panic")

    // If we are already stopped, then there is nothing to do

    l.Debug().Msg("stop each worker")
    l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
    doneChs := make([]<-chan struct{}, p.workerConcurrency)
    for _, worker := range p.workers {
        l.Info().Msgf("stop worker: %d", worker.id)
        doneChs = append(doneChs, worker.Stop())
        l.Debug().Msgf("status: %d", worker.GetStatus())

    go func() {
        l.Info().Msg("wait stop worker")
        // wait for all channels to be closed
        for _, ch := range doneChs {

    return doneCh


// Status represents the current state of a worker.
type Status int

// The possible values for WorkerStatus.
const (
    WorkerIdle    Status = iota // Worker is waiting for a job.
    WorkerRunning               // Worker is running a job.
    WorkerStopped               // Worker has stopped.

type Worker struct {
    // The worker pool that this worker belongs to.
    pool *Pool

    // worker id
    id int64

    // The context of the parent worker pool.
    workerContext context.Context

    // A mutex used to protect the worker's state.
    mutex sync.Mutex

    // A channel used to signal to the worker that it should stop.
    stopCh chan struct{}

    // A channel from which the worker receives jobs.
    collector <-chan *Job

    // The job that the worker is currently processing.
    process *Job

    // The current status of the worker.
    status Status

    timeout time.Duration

    logger *zerolog.Logger

// NewWorker creates a new worker.
func NewWorker(parentCtx context.Context, pool *Pool, queue chan *Job, id int64, timeout time.Duration, logger *zerolog.Logger) *Worker {
    l := logger.With().Str("service", "Worker").Logger()
    return &Worker{
        pool:          pool,
        workerContext: parentCtx,
        collector:     queue,
        id:            id,
        timeout:       timeout,
        logger:        &l,
        stopCh:        make(chan struct{}, 1),

// Start method is a goroutine that continuously extracts and processes tasks assigned to a worker, ensuring it's always ready to handle incoming tasks.
// We create a specific logger for the worker to track its activities.
// The method operates in a loop, constantly handling various scenarios to exit gracefully, such as receiving stop signals or context termination.
// When a task is available, the worker retrieves and logs it.
// The worker sets its status to 'WorkerRunning,' indicating active task processing.
// It executes the task using the Run method, passing worker-specific context.
// Any task execution errors are logged for debugging.
// After task completion, the worker resets its status to 'WorkerIdle,' indicating readiness for more tasks.
// A log message signals the task processing completion.
// This method guarantees continuous task processing while maintaining detailed logs for debugging and monitoring.
func (w *Worker) Start(wg *sync.WaitGroup) {
    // Create a logger for the worker.
    l := w.logger.With().Str("method", "Start").Logger()
    // Log that the worker is starting.
    l.Debug().Msg("worker runner")

    // As soon as a worker is created, it is necessarily in the status of WorkerIdle.

    // Start a goroutine to process jobs.
    defer func() {
        if rec := recover(); rec != nil {
            err := common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("worker pool panic: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)


    for {
        select {
        // If the worker receives a stop signal, exit the goroutine.
        case <-w.stopCh:
            // The worker has completed from outside, which means the worker pool completes the job.
            l.Info().Msg("stop channel is close")

        // If the parent context is done, exit the goroutine.
        case <-w.workerContext.Done():
            l.Info().Msg("parent context is close")

        select {
        // If the worker receives a stop signal, exit the goroutine.
        // https://stackoverflow.com/questions/16105325/how-to-check-a-channel-is-closed-or-not-without-reading-it
        // I couldn't figure out why, but in some tests, I was getting an error that the channel is closed,
        // for what reason I don't know, so I added this check in case the channel is closed, I should clearly return an error in the log that the channel is closed.
        case _, ok := <-w.stopCh:
            if !ok {
                l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)

            l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)

        case <-w.workerContext.Done():
            l.Info().Msg("parent context is close")

        // If a job is available, process it.
        case task, ok := <-w.collector:
            if ok {
                if task != nil {
                    l.Info().Msgf("worker: [%d] get task: [%s]", w.id, task.name)
                    err := w.processTask(task)
                    if err != nil {
                        l.Error().Err(err).Msgf("failed run task: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)

                    // Log that the worker finished processing the task.
                    l.Info().Msg("worker finished processing the task")
            } else {
                // if the job channel is closed, there is no point in further work of the worker
                l.Info().Msgf("job collector is close: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)


// processTask processes the given task.
func (w *Worker) processTask(task *Job) error {
    w.process = task

    // Process the job.
    task.Run(w.timeout, w.id, w.pool.name)
    taskErr := task.GetError()
    if taskErr != nil {
        return taskErr

    // Set the worker status to idle.
    w.process = nil

    return nil

func (w *Worker) Stop() <-chan struct{} {
    l := w.logger.With().Str("method", "Stop").Logger()
    l.Debug().Msg("stop worker")

    defer func() {
        if r := recover(); r != nil {
            l.Error().Err(common.GetRecoverError(r)).Msg("stop worker panic")

    doneCh := make(chan struct{})
    go func() {
        defer w.mutex.Unlock()

        select {
        case <-w.stopCh:

            // Close the stopCh to signal to the worker that it should stop.
            // This will cause the worker to exit its loop and finish processing the current job.
            w.stopCh <- struct{}{}
            // Set the worker status to stopped.
            // If there is a task in processing at the time of worker termination, it must be terminated
            if w.process != nil {


    return doneCh

// setStatus sets the worker's status.
func (w *Worker) setStatus(status Status) {
    // Lock the worker's mutex to protect its state.
    defer w.mutex.Unlock()

    // Set the worker's status.
    w.status = status

// GetStatus returns the current status of the worker.
func (w *Worker) GetStatus() Status {
    return w.status


t.Run("worker pool", func(t *testing.T) {
        counter := 0
        workerCount := runtime.NumCPU()
        mut := new(sync.Mutex)
        mockMainFunc := func(ctx context.Context, task *model.Task) {
            defer mut.Unlock()
            fmt.Printf("Main func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)

        // Create a mock error function for the job.
        mockErrFunc := func(ctx context.Context, task *model.Task) {
            // Simulate error function logic here.
            fmt.Printf("Err func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)

        var wg sync.WaitGroup
        parentCtx := context.Background()

        logger := zerolog.New(os.Stdout)
        task := make(chan *Job, 100)
        pool := NewPool(parentCtx, task, workerCount, "test-pool", &logger, 3*time.Second)
        assert.Equal(t, 0, pool.RunningWorkers())
        go func() {

        for w := 1; w <= workerCount; w++ {
            _ = pool.AddWorker(task, int64(w), 3*time.Second)

        time.Sleep(1 * time.Second)
        assert.Equal(t, workerCount, pool.RunningWorkers())

        for i := 0; i < 100; i++ {
            j := NewJob(parentCtx, &model.Task{FileID: 900000, VolumeID: int32(890 + i)}, -1*time.Second, fmt.Sprintf("job:%d", i))
            task <- j

        time.Sleep(2 * time.Second)
        go func() {
            defer wg.Done()
        assert.Equal(t, 100, counter)
        fmt.Print("COUNT worker: ", pool.RunningWorkers())



这个问题有点难回答,因为你已经提供了相当多的代码,但它是不完整的(所以我们不能很容易地运行它自己)-一个minimal, reproducible, example将使它更容易回答。话虽如此,根据你提供的日志,问题是在两个日志条目之间:

l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
    doneChs := make([]<-chan struct{}, p.workerConcurrency)
    for _, worker := range p.workers {
        l.Info().Msgf("stop worker: %d", worker.id)


pool := &Pool{
    workers:         make([]*Worker, workerConcurrency),


func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
...    l := p.logger.With().Str("method", "AddWorker").Logger()
    p.workers = append(p.workers, worker)

make的语法是make([]T, length, capacity)。因此,您正在创建一个包含workerConcurrency元素的切片(将设置为默认值nil),然后AddWorker将新条目附加到切片的末尾。
使用basic example可能最容易理解这一点:

func main() {
    a := 8
    x := make([]*int, 3)
    x = append(x, &a)
    fmt.Println(x) // sample output [<nil> <nil> <nil> 0xc000118000]

这样做的结果是,for _, worker := range p.workers {中的第一个worker将为nil(导致访问worker.id时出现死机)。
最简单的解决方案是将workers初始化为nil(或者[]*Worker{},如果你喜欢的话)。如果你真的想提前分配内存,你可以使用make([]*Worker, 0, workerConcurrency)(size = 0,capacity = workerConcurrency)。
