我正在编写一个工作池实现,在停止Worker
时遇到了一个问题。在运行for _, worker := range p.workers {...}
循环后,我会出现恐慌。在添加debug
后,我开始出现恐慌,为什么这是我不明白的。在workersShutdown
忽略循环之前,我把fmt.Print放在调试中,但代码不工作,我设置defer func()
与恢复和什么都没有。添加调试,并能够得到恐慌??我不能弄清楚如何和为什么。当我在测试中调用Stop
方法终止Pool
我得到了恐慌,下面是运行测试后的日志。
{"level":"info","service":"workerPool","workerPoolName":"test-pool","service":"Worker","method":"Start","message":"worker finished processing the task"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"Stop","message":"stop worker pool with all worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"shutdown","message":"shutdown worker pool"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop all workers"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop each worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"worker count all: 8"}
{"level":"error","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","error":"runtime error: invalid memory address or nil pointer dereference","message":"workersShutdown panic"}
字符串
如何重写Pool.Stop()
和Worker.Stop
方法,但仍然保证知道worker
是完整的?
池源代码:
// Option represents an option that can be passed when instantiating the work pool
type Option func(pool *Pool)
// CreateContext initializes the context and contextCancelFunc function for the pool.
func CreateContext(ctx context.Context) Option {
return func(pool *Pool) {
pool.ctx, pool.contextCancelFunc = context.WithCancel(ctx)
}
}
// Pool worker controller
type Pool struct {
// Parent context for the pool.
parentCtx context.Context
// Pool's context.
ctx context.Context
// Cancel function for the pool's context.
contextCancelFunc context.CancelFunc
// Name of the worker pool. More convenient for logging.
name string
// Channel to signal stopping the pool.
stopCh chan struct{}
// Channel for queueing jobs.
collector chan *Job
// Slice to hold worker instances.
workers []*Worker
// Maximum number of workers in the pool.
maxWorkersCount int
// Number of currently running workers.
workerConcurrency int32
// Timeout for individual workers.
workerTimeOut time.Duration
// Mutex for locking access to the pool.
mutex sync.Mutex
// Used for a one-time action (starting the pool).
onceStart sync.Once
// Used for a one-time action (stopping the pool).
onceStop sync.Once
// Wait group for tracking running workers.
wg sync.WaitGroup
// Logger for the pool.
logger *zerolog.Logger
stopped bool
}
// NewPool creates a new worker pool.
func NewPool(parentCtx context.Context, queue chan *Job, workerConcurrency int, name string, logger *zerolog.Logger, workerTimeOut time.Duration) *Pool {
l := logger.With().Str("service", "workerPool").Str("workerPoolName", name).Logger()
if workerConcurrency == 0 {
workerConcurrency = runtime.NumCPU() * 2
}
pool := &Pool{
parentCtx: parentCtx,
name: name,
stopCh: make(chan struct{}, 1),
collector: queue,
workers: make([]*Worker, workerConcurrency),
maxWorkersCount: workerConcurrency,
workerTimeOut: workerTimeOut,
logger: &l,
stopped: false,
}
CreateContext(context.Background())(pool)
return pool
}
// Run starts the worker pool and worker goroutines. It creates and launches a specified number of worker goroutines,
// each of which is responsible for processing jobs from a shared collector. This method also continuously listens for stop signals
// or context cancellation and reacts accordingly, ensuring a clean and controlled shutdown of the worker pool.
func (p *Pool) Run() {
p.onceStart.Do(func() {
// Add one to the wait group for the main pool loop.
p.wg.Add(1)
// Start the main pool loop in a goroutine.
go p.loop()
})
}
func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
l := p.logger.With().Str("method", "AddWorker").Logger()
defer func() {
// Recover from any panic in the job and report it.
if rec := recover(); rec != nil {
err = common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("add worker panic, pool name: %s", p.name)
}
}
}()
if !p.incrementWorkerCount() {
return fmt.Errorf("can't create more worker")
}
p.mutex.Lock()
defer p.mutex.Unlock()
worker := NewWorker(p.ctx, p, collector, workerID, workerTimeout, p.logger)
// Add the worker to the workers slice.
p.workers = append(p.workers, worker)
// Add one to the wait group to track the new worker.
p.wg.Add(1)
go worker.Start(&p.wg)
return nil
}
// loop is the main worker pool loop. It creates and launches worker goroutines, listens for stop signals,
// and ensures a clean and controlled shutdown of the worker pool.
func (p *Pool) loop() {
l := p.logger.With().Str("method", "loop").Logger()
defer func() {
// Recover from any panic in the job and report it.
if rec := recover(); rec != nil {
err := common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("pool catch panic, pool name: %s", p.name)
}
}
p.wg.Done()
// Cancel the pool's context.
p.contextCancelFunc()
}()
for {
select {
case _, ok := <-p.stopCh:
if !ok {
l.Error().Msg("stop channel is close")
}
// This option suits us as it guarantees that the channel was closed when the Stop method was called,
// which means that the pool should stop working.
l.Info().Msg("stop pool")
return
case <-p.parentCtx.Done():
// Log that the pool's context is closing.
l.Info().Msg("parent context is close")
// Acquire a read lock on the pool to prevent other modifications.
p.mutex.Lock()
// Trigger the pool shutdown sequence.
p.shutdown()
// Release the read lock.
p.mutex.Unlock()
return
case <-p.ctx.Done():
l.Info().Msg("context is close") // Log that the pool's context is closing.
return
}
}
}
// RunningWorkers returns the number of running worker goroutines.
func (p *Pool) RunningWorkers() int {
return int(atomic.LoadInt32(&p.workerConcurrency))
}
// incrementWorkerCount attempts to increment the worker count if it's below the maximum limit.
// It protects the worker count and associated wait group using a mutex to ensure
// thread-safety while managing the pool's worker count.
// If the current worker count is less than the maximum allowed workers, it increments
// the worker count and adds one to the wait group, signifying that a new worker is being started.
// If the maximum worker limit has been reached, it returns false to indicate that no more workers can be started.
// NOTE: Perhaps in the next iteration we will add simple workers to not create a large number of workers,
// but to make them flexible, to have a minimum number of workers,
// and a maximum number of workers to be able to automatically manage the pool of workers,
// without downtime or oversupply of workers.
func (p *Pool) incrementWorkerCount() bool {
// Lock the mutex to protect the worker count and wait group.
p.mutex.Lock()
defer p.mutex.Unlock()
// Get the current count of running workers.
counter := p.RunningWorkers()
if counter >= p.maxWorkersCount {
// The maximum worker limit has been reached, no more workers can be started.
return false
}
// Increment the worker counter.
atomic.AddInt32(&p.workerConcurrency, 1)
return true
}
func (p *Pool) decrementWorkerCount() {
p.logger.Info().Msg("decrement worker count")
atomic.AddInt32(&p.workerConcurrency, -1)
}
// Stop stops the worker pool.
func (p *Pool) Stop() {
l := p.logger.With().Str("method", "Stop").Logger()
l.Debug().Msg("stop worker pool with all worker")
p.mutex.Lock()
p.mutex.Unlock()
defer func() {
// Wait for all workers to finish.
p.wg.Wait()
}()
p.shutdown()
// Cancel the pool's context.
p.contextCancelFunc()
// Signal for stopping the pool.
p.stopCh <- struct{}{}
// Close the stop channel.
close(p.stopCh)
// Trigger the pool shutdown sequence.
}
// shutdown stops the worker pool.
func (p *Pool) shutdown() {
l := p.logger.With().Str("method", "shutdown").Logger()
l.Debug().Msg("shutdown worker pool")
// Stop all workers.
<-p.workersShutdown()
return
}
// workersShutdown stops all worker goroutines.
func (p *Pool) workersShutdown() <-chan struct{} {
l := p.logger.With().Str("method", "workersShutdown").Logger()
l.Debug().Msg("stop all workers")
doneCh := make(chan struct{})
defer func() {
if r := recover(); r != nil {
l.Error().Err(common.GetRecoverError(r)).Msg("workersShutdown panic")
return
}
}()
// If we are already stopped, then there is nothing to do
l.Debug().Msg("stop each worker")
l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
doneChs := make([]<-chan struct{}, p.workerConcurrency)
for _, worker := range p.workers {
l.Info().Msgf("stop worker: %d", worker.id)
doneChs = append(doneChs, worker.Stop())
l.Debug().Msgf("status: %d", worker.GetStatus())
}
go func() {
l.Info().Msg("wait stop worker")
// wait for all channels to be closed
for _, ch := range doneChs {
<-ch
p.decrementWorkerCount()
}
close(doneCh)
}()
return doneCh
}
型
工人源代码:
// Status represents the current state of a worker.
type Status int
// The possible values for WorkerStatus.
const (
WorkerIdle Status = iota // Worker is waiting for a job.
WorkerRunning // Worker is running a job.
WorkerStopped // Worker has stopped.
)
type Worker struct {
// The worker pool that this worker belongs to.
pool *Pool
// worker id
id int64
// The context of the parent worker pool.
workerContext context.Context
// A mutex used to protect the worker's state.
mutex sync.Mutex
// A channel used to signal to the worker that it should stop.
stopCh chan struct{}
// A channel from which the worker receives jobs.
collector <-chan *Job
// The job that the worker is currently processing.
process *Job
// The current status of the worker.
status Status
timeout time.Duration
logger *zerolog.Logger
}
// NewWorker creates a new worker.
func NewWorker(parentCtx context.Context, pool *Pool, queue chan *Job, id int64, timeout time.Duration, logger *zerolog.Logger) *Worker {
l := logger.With().Str("service", "Worker").Logger()
return &Worker{
pool: pool,
workerContext: parentCtx,
collector: queue,
id: id,
timeout: timeout,
logger: &l,
stopCh: make(chan struct{}, 1),
}
}
// Start method is a goroutine that continuously extracts and processes tasks assigned to a worker, ensuring it's always ready to handle incoming tasks.
// We create a specific logger for the worker to track its activities.
// The method operates in a loop, constantly handling various scenarios to exit gracefully, such as receiving stop signals or context termination.
// When a task is available, the worker retrieves and logs it.
// The worker sets its status to 'WorkerRunning,' indicating active task processing.
// It executes the task using the Run method, passing worker-specific context.
// Any task execution errors are logged for debugging.
// After task completion, the worker resets its status to 'WorkerIdle,' indicating readiness for more tasks.
// A log message signals the task processing completion.
// This method guarantees continuous task processing while maintaining detailed logs for debugging and monitoring.
func (w *Worker) Start(wg *sync.WaitGroup) {
// Create a logger for the worker.
l := w.logger.With().Str("method", "Start").Logger()
// Log that the worker is starting.
l.Debug().Msg("worker runner")
// As soon as a worker is created, it is necessarily in the status of WorkerIdle.
w.setStatus(WorkerIdle)
// Start a goroutine to process jobs.
defer func() {
if rec := recover(); rec != nil {
err := common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("worker pool panic: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
}
}
w.setStatus(WorkerStopped)
wg.Done()
}()
for {
select {
// If the worker receives a stop signal, exit the goroutine.
case <-w.stopCh:
// The worker has completed from outside, which means the worker pool completes the job.
w.setStatus(WorkerStopped)
l.Info().Msg("stop channel is close")
return
// If the parent context is done, exit the goroutine.
case <-w.workerContext.Done():
w.setStatus(WorkerStopped)
l.Info().Msg("parent context is close")
return
default:
}
select {
// If the worker receives a stop signal, exit the goroutine.
// https://stackoverflow.com/questions/16105325/how-to-check-a-channel-is-closed-or-not-without-reading-it
// I couldn't figure out why, but in some tests, I was getting an error that the channel is closed,
// for what reason I don't know, so I added this check in case the channel is closed, I should clearly return an error in the log that the channel is closed.
case _, ok := <-w.stopCh:
if !ok {
w.setStatus(WorkerStopped)
l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
}
w.setStatus(WorkerStopped)
l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
case <-w.workerContext.Done():
w.setStatus(WorkerStopped)
l.Info().Msg("parent context is close")
return
// If a job is available, process it.
case task, ok := <-w.collector:
if ok {
if task != nil {
l.Info().Msgf("worker: [%d] get task: [%s]", w.id, task.name)
err := w.processTask(task)
if err != nil {
l.Error().Err(err).Msgf("failed run task: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
}
// Log that the worker finished processing the task.
l.Info().Msg("worker finished processing the task")
}
} else {
// if the job channel is closed, there is no point in further work of the worker
w.setStatus(WorkerStopped)
l.Info().Msgf("job collector is close: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
}
}
}
}
// processTask processes the given task.
func (w *Worker) processTask(task *Job) error {
w.setStatus(WorkerRunning)
w.process = task
// Process the job.
task.Run(w.timeout, w.id, w.pool.name)
taskErr := task.GetError()
if taskErr != nil {
return taskErr
}
// Set the worker status to idle.
w.process = nil
w.setStatus(WorkerIdle)
return nil
}
func (w *Worker) Stop() <-chan struct{} {
l := w.logger.With().Str("method", "Stop").Logger()
l.Debug().Msg("stop worker")
defer func() {
if r := recover(); r != nil {
l.Error().Err(common.GetRecoverError(r)).Msg("stop worker panic")
return
}
}()
doneCh := make(chan struct{})
go func() {
w.mutex.Lock()
defer w.mutex.Unlock()
select {
case <-w.stopCh:
close(doneCh)
close(w.errCh)
return
default:
// Close the stopCh to signal to the worker that it should stop.
// This will cause the worker to exit its loop and finish processing the current job.
w.stopCh <- struct{}{}
close(w.stopCh)
close(w.errCh)
// Set the worker status to stopped.
w.setStatus(WorkerStopped)
// If there is a task in processing at the time of worker termination, it must be terminated
if w.process != nil {
w.process.Stop()
}
close(doneCh)
}
}()
return doneCh
}
// setStatus sets the worker's status.
func (w *Worker) setStatus(status Status) {
// Lock the worker's mutex to protect its state.
w.mutex.Lock()
defer w.mutex.Unlock()
// Set the worker's status.
w.status = status
}
// GetStatus returns the current status of the worker.
func (w *Worker) GetStatus() Status {
return w.status
}
型
测试用例代码:
t.Run("worker pool", func(t *testing.T) {
counter := 0
workerCount := runtime.NumCPU()
mut := new(sync.Mutex)
mockMainFunc := func(ctx context.Context, task *model.Task) {
mut.Lock()
defer mut.Unlock()
counter++
fmt.Printf("Main func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
}
// Create a mock error function for the job.
mockErrFunc := func(ctx context.Context, task *model.Task) {
// Simulate error function logic here.
fmt.Printf("Err func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
}
var wg sync.WaitGroup
parentCtx := context.Background()
logger := zerolog.New(os.Stdout)
task := make(chan *Job, 100)
pool := NewPool(parentCtx, task, workerCount, "test-pool", &logger, 3*time.Second)
assert.Equal(t, 0, pool.RunningWorkers())
go func() {
pool.Run()
}()
for w := 1; w <= workerCount; w++ {
_ = pool.AddWorker(task, int64(w), 3*time.Second)
}
time.Sleep(1 * time.Second)
assert.Equal(t, workerCount, pool.RunningWorkers())
for i := 0; i < 100; i++ {
wg.Add(1)
j := NewJob(parentCtx, &model.Task{FileID: 900000, VolumeID: int32(890 + i)}, -1*time.Second, fmt.Sprintf("job:%d", i))
j.SetMainFunc(mockMainFunc)
j.SetErrorFunc(mockErrFunc)
j.SetWaitGroup(&wg)
task <- j
}
time.Sleep(2 * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
pool.Stop()
}()
wg.Wait()
assert.Equal(t, 100, counter)
fmt.Print("COUNT worker: ", pool.RunningWorkers())
})
型
1条答案
按热度按时间zaqlnxep1#
这个问题有点难回答,因为你已经提供了相当多的代码,但它是不完整的(所以我们不能很容易地运行它自己)-一个minimal, reproducible, example将使它更容易回答。话虽如此,根据你提供的日志,问题是在两个日志条目之间:
字符串
这使得问题很可能是
p.workers
中有nil
条目。看看你是如何初始化它的,我们发现:型
并添加以下项目:
型
make
的语法是make([]T, length, capacity)
。因此,您正在创建一个包含workerConcurrency
元素的切片(将设置为默认值nil
),然后AddWorker
将新条目附加到切片的末尾。使用basic example可能最容易理解这一点:
型
请注意,有三个
nil
条目,后面是添加了append
的条目。这样做的结果是,
for _, worker := range p.workers {
中的第一个worker
将为nil(导致访问worker.id
时出现死机)。最简单的解决方案是将
workers
初始化为nil
(或者[]*Worker{}
,如果你喜欢的话)。如果你真的想提前分配内存,你可以使用make([]*Worker, 0, workerConcurrency)
(size = 0,capacity = workerConcurrency)。注意事项:我并不是说这是唯一的问题(它不是,例如,
Pool.Stop()
锁定互斥锁,然后立即解锁它,我怀疑你打算使用一个延迟),但希望,这将允许你向前迈进。