diff options
author | Valery Piashchynski <[email protected]> | 2020-10-14 15:23:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-14 15:23:22 +0300 |
commit | a407be25f068a5c0a20a4cf96ddfaf4ccd3af739 (patch) | |
tree | 143e1ca6729d28b1d50e151a8d8050ac67e6f5ec /workers_watcher.go | |
parent | b5f429667131ef91498d67d08242b9f46cc23d6d (diff) |
Fixed: race conditions in tests, Handle_Dead test activated
Diffstat (limited to 'workers_watcher.go')
-rw-r--r-- | workers_watcher.go | 76 |
1 files changed, 38 insertions, 38 deletions
diff --git a/workers_watcher.go b/workers_watcher.go index f8522c46..d9d27196 100644 --- a/workers_watcher.go +++ b/workers_watcher.go @@ -60,9 +60,9 @@ func (stack *Stack) Pop() (WorkerBase, bool) { } type WorkersWatcher struct { - mutex sync.Mutex + mutex sync.RWMutex stack *Stack - allocator func(args ...interface{}) (*SyncWorker, error) + allocator func(args ...interface{}) (WorkerBase, error) initialNumWorkers int64 actualNumWorkers int64 events chan PoolEvent @@ -80,13 +80,13 @@ type WorkerWatcher interface { // Destroy destroys the underlying stack Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - WorkersList(ctx context.Context) []WorkerBase + WorkersList() []WorkerBase // RemoveWorker remove worker from the stack RemoveWorker(ctx context.Context, wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { +func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { // todo check if events not nil ww := &WorkersWatcher{ stack: NewWorkersStack(), @@ -106,11 +106,10 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) return err } ww.stack.Push(sw) + ww.watch(sw) + go func(swc WorkerBase) { - //ww.mutex.Lock() - ww.watch(&swc) - ww.wait(ctx, &swc) - //ww.mutex.Unlock() + ww.wait(ctx, swc) }(sw) } return nil @@ -146,14 +145,14 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) if w == nil { continue } - ww.actualNumWorkers-- + ww.decreaseNumOfActualWorkers() return w, nil case <-tout.C: return nil, errors.New("no free stack") } } } - ww.actualNumWorkers-- + ww.decreaseNumOfActualWorkers() return w, nil } @@ -163,9 +162,9 @@ func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { if err != nil { return err } - ww.addToWatch(*sw) + ww.addToWatch(sw) ww.stack.mutex.Unlock() - ww.PushWorker(*sw) + ww.PushWorker(sw) return nil } @@ -202,9 +201,7 @@ func (ww *WorkersWatcher) PushWorker(w WorkerBase) { } func (ww *WorkersWatcher) ReduceWorkersCount() { - ww.mutex.Unlock() - ww.actualNumWorkers-- - ww.mutex.Lock() + ww.decreaseNumOfActualWorkers() } // Destroy all underlying stack (but let them to complete the task) @@ -217,9 +214,12 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: + ww.stack.mutex.Lock() if len(ww.stack.workers) != int(ww.actualNumWorkers) { + ww.stack.mutex.Unlock() continue } + ww.stack.mutex.Unlock() // unnecessary mutex, but // just to make sure. All stack at this moment are in the stack // Pop operation is blocked, push can't be done, since it's not possible to pop @@ -238,53 +238,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation -func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase { +func (ww *WorkersWatcher) WorkersList() []WorkerBase { return ww.stack.workers } -func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) { - err := (*w).Wait(ctx) +func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) { + err := w.Wait(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} } // If not destroyed, reallocate - if (*w).State().Value() != StateDestroyed { - pid := (*w).Pid() + if w.State().Value() != StateDestroyed { + pid := w.Pid() + ww.stack.mutex.Lock() for i := 0; i < len(ww.stack.workers); i++ { // worker in the stack, reallocating if ww.stack.workers[i].Pid() == pid { - ww.stack.mutex.Lock() ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - ww.stack.mutex.Unlock() err = ww.AllocateNew(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} return } + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerConstruct, + Worker: nil, + Payload: nil, + }} return } } + ww.stack.mutex.Unlock() // worker not in the stack (not returned), forget and allocate new err = ww.AllocateNew(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} return } + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerConstruct, + Worker: nil, + Payload: nil, + }} } return } @@ -293,30 +302,21 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { - ww.wait(context.Background(), &wb) + ww.wait(context.Background(), wb) }() } -func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error { - sw, err := ww.allocator() - if err != nil { - return err - } - *wb = *sw - return nil -} - func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { ww.mutex.Lock() ww.actualNumWorkers-- ww.mutex.Unlock() } -func (ww *WorkersWatcher) watch(swc *WorkerBase) { +func (ww *WorkersWatcher) watch(swc WorkerBase) { // todo make event to stop function go func() { select { - case ev := <-(*swc).Events(): + case ev := <-swc.Events(): ww.events <- PoolEvent{Payload: ev} } }() |