diff options
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-x | worker_watcher.go | 36 |
1 files changed, 16 insertions, 20 deletions
diff --git a/worker_watcher.go b/worker_watcher.go index 3b83c8ff..8bc147d0 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -62,7 +62,7 @@ func (stack *Stack) Pop() (WorkerBase, bool) { type WorkerWatcher interface { // AddToWatch used to add stack to wait its state - AddToWatch(ctx context.Context, workers []WorkerBase) error + AddToWatch(workers []WorkerBase) error // GetFreeWorker provide first free worker GetFreeWorker(ctx context.Context) (WorkerBase, error) @@ -71,7 +71,7 @@ type WorkerWatcher interface { PushWorker(w WorkerBase) // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew(ctx context.Context) error + AllocateNew() error // Destroy destroys the underlying stack Destroy(ctx context.Context) @@ -80,7 +80,7 @@ type WorkerWatcher interface { WorkersList() []WorkerBase // RemoveWorker remove worker from the stack - RemoveWorker(ctx context.Context, wb WorkerBase) error + RemoveWorker(wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced @@ -105,7 +105,7 @@ type workerWatcher struct { events util.EventsHandler } -func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { +func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error { for i := 0; i < len(workers); i++ { sw, err := NewSyncWorker(workers[i]) if err != nil { @@ -115,14 +115,14 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e sw.AddListener(ww.events.Push) go func(swc WorkerBase) { - ww.wait(ctx, swc) + ww.wait(swc) }(sw) } return nil } func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { - const op = errors.Op("get_free_worker") + const op = errors.Op("GetFreeWorker") // thread safe operation w, stop := ww.stack.Pop() if stop { @@ -132,19 +132,15 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // handle worker remove state // in this state worker is destroyed by supervisor if w != nil && w.State().Value() == StateRemove { - err := ww.RemoveWorker(ctx, w) + err := ww.RemoveWorker(w) if err != nil { return nil, err } // try to get next return ww.GetFreeWorker(ctx) } - // no free stack if w == nil { - // TODO allocate timeout - tout := time.NewTicker(time.Second * 180) - defer tout.Stop() for { select { default: @@ -157,8 +153,8 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } ww.ReduceWorkersCount() return w, nil - case <-tout.C: - return nil, errors.E(op, errors.Str("no free workers in the stack")) + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } } @@ -167,12 +163,12 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) return w, nil } -func (ww *workerWatcher) AllocateNew(ctx context.Context) error { +func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Lock() const op = errors.Op("allocate new worker") sw, err := ww.allocator() if err != nil { - return errors.E(op, err) + return errors.E(op, errors.WorkerAllocate, err) } ww.addToWatch(sw) @@ -187,7 +183,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error { return nil } -func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { +func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { ww.stack.mutex.Lock() const op = errors.Op("remove worker") defer ww.stack.mutex.Unlock() @@ -275,7 +271,7 @@ func (ww *workerWatcher) WorkersList() []WorkerBase { return workersCopy } -func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { +func (ww *workerWatcher) wait(w WorkerBase) { const op = errors.Op("process wait") err := w.Wait() if err != nil { @@ -300,7 +296,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { ww.ReduceWorkersCount() ww.stack.mutex.Unlock() - err = ww.AllocateNew(ctx) + err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, @@ -315,7 +311,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { ww.stack.mutex.Unlock() // worker not in the stack (not returned), forget and allocate new - err = ww.AllocateNew(ctx) + err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, @@ -329,6 +325,6 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { - ww.wait(context.Background(), wb) + ww.wait(wb) }() } |