summaryrefslogtreecommitdiff
path: root/workers_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-14 15:23:22 +0300
committerValery Piashchynski <[email protected]>2020-10-14 15:23:22 +0300
commita407be25f068a5c0a20a4cf96ddfaf4ccd3af739 (patch)
tree143e1ca6729d28b1d50e151a8d8050ac67e6f5ec /workers_watcher.go
parentb5f429667131ef91498d67d08242b9f46cc23d6d (diff)
Fixed: race conditions in tests, Handle_Dead test activated
Diffstat (limited to 'workers_watcher.go')
-rw-r--r--workers_watcher.go76
1 files changed, 38 insertions, 38 deletions
diff --git a/workers_watcher.go b/workers_watcher.go
index f8522c46..d9d27196 100644
--- a/workers_watcher.go
+++ b/workers_watcher.go
@@ -60,9 +60,9 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
}
type WorkersWatcher struct {
- mutex sync.Mutex
+ mutex sync.RWMutex
stack *Stack
- allocator func(args ...interface{}) (*SyncWorker, error)
+ allocator func(args ...interface{}) (WorkerBase, error)
initialNumWorkers int64
actualNumWorkers int64
events chan PoolEvent
@@ -80,13 +80,13 @@ type WorkerWatcher interface {
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- WorkersList(ctx context.Context) []WorkerBase
+ WorkersList() []WorkerBase
// RemoveWorker remove worker from the stack
RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
+func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
// todo check if events not nil
ww := &WorkersWatcher{
stack: NewWorkersStack(),
@@ -106,11 +106,10 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase)
return err
}
ww.stack.Push(sw)
+ ww.watch(sw)
+
go func(swc WorkerBase) {
- //ww.mutex.Lock()
- ww.watch(&swc)
- ww.wait(ctx, &swc)
- //ww.mutex.Unlock()
+ ww.wait(ctx, swc)
}(sw)
}
return nil
@@ -146,14 +145,14 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
if w == nil {
continue
}
- ww.actualNumWorkers--
+ ww.decreaseNumOfActualWorkers()
return w, nil
case <-tout.C:
return nil, errors.New("no free stack")
}
}
}
- ww.actualNumWorkers--
+ ww.decreaseNumOfActualWorkers()
return w, nil
}
@@ -163,9 +162,9 @@ func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error {
if err != nil {
return err
}
- ww.addToWatch(*sw)
+ ww.addToWatch(sw)
ww.stack.mutex.Unlock()
- ww.PushWorker(*sw)
+ ww.PushWorker(sw)
return nil
}
@@ -202,9 +201,7 @@ func (ww *WorkersWatcher) PushWorker(w WorkerBase) {
}
func (ww *WorkersWatcher) ReduceWorkersCount() {
- ww.mutex.Unlock()
- ww.actualNumWorkers--
- ww.mutex.Lock()
+ ww.decreaseNumOfActualWorkers()
}
// Destroy all underlying stack (but let them to complete the task)
@@ -217,9 +214,12 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
for {
select {
case <-tt.C:
+ ww.stack.mutex.Lock()
if len(ww.stack.workers) != int(ww.actualNumWorkers) {
+ ww.stack.mutex.Unlock()
continue
}
+ ww.stack.mutex.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
@@ -238,53 +238,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation
-func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase {
+func (ww *WorkersWatcher) WorkersList() []WorkerBase {
return ww.stack.workers
}
-func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) {
- err := (*w).Wait(ctx)
+func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) {
+ err := w.Wait(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
}
// If not destroyed, reallocate
- if (*w).State().Value() != StateDestroyed {
- pid := (*w).Pid()
+ if w.State().Value() != StateDestroyed {
+ pid := w.Pid()
+ ww.stack.mutex.Lock()
for i := 0; i < len(ww.stack.workers); i++ {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
- ww.stack.mutex.Lock()
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
-
ww.decreaseNumOfActualWorkers()
-
ww.stack.mutex.Unlock()
err = ww.AllocateNew(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
return
}
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerConstruct,
+ Worker: nil,
+ Payload: nil,
+ }}
return
}
}
+ ww.stack.mutex.Unlock()
// worker not in the stack (not returned), forget and allocate new
err = ww.AllocateNew(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
return
}
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerConstruct,
+ Worker: nil,
+ Payload: nil,
+ }}
}
return
}
@@ -293,30 +302,21 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
- ww.wait(context.Background(), &wb)
+ ww.wait(context.Background(), wb)
}()
}
-func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error {
- sw, err := ww.allocator()
- if err != nil {
- return err
- }
- *wb = *sw
- return nil
-}
-
func (ww *WorkersWatcher) decreaseNumOfActualWorkers() {
ww.mutex.Lock()
ww.actualNumWorkers--
ww.mutex.Unlock()
}
-func (ww *WorkersWatcher) watch(swc *WorkerBase) {
+func (ww *WorkersWatcher) watch(swc WorkerBase) {
// todo make event to stop function
go func() {
select {
- case ev := <-(*swc).Events():
+ case ev := <-swc.Events():
ww.events <- PoolEvent{Payload: ev}
}
}()