summaryrefslogtreecommitdiff
path: root/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-xworker_watcher.go36
1 files changed, 16 insertions, 20 deletions
diff --git a/worker_watcher.go b/worker_watcher.go
index 3b83c8ff..8bc147d0 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -62,7 +62,7 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
- AddToWatch(ctx context.Context, workers []WorkerBase) error
+ AddToWatch(workers []WorkerBase) error
// GetFreeWorker provide first free worker
GetFreeWorker(ctx context.Context) (WorkerBase, error)
@@ -71,7 +71,7 @@ type WorkerWatcher interface {
PushWorker(w WorkerBase)
// AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew(ctx context.Context) error
+ AllocateNew() error
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
@@ -80,7 +80,7 @@ type WorkerWatcher interface {
WorkersList() []WorkerBase
// RemoveWorker remove worker from the stack
- RemoveWorker(ctx context.Context, wb WorkerBase) error
+ RemoveWorker(wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
@@ -105,7 +105,7 @@ type workerWatcher struct {
events util.EventsHandler
}
-func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
+func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error {
for i := 0; i < len(workers); i++ {
sw, err := NewSyncWorker(workers[i])
if err != nil {
@@ -115,14 +115,14 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e
sw.AddListener(ww.events.Push)
go func(swc WorkerBase) {
- ww.wait(ctx, swc)
+ ww.wait(swc)
}(sw)
}
return nil
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
- const op = errors.Op("get_free_worker")
+ const op = errors.Op("GetFreeWorker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
@@ -132,19 +132,15 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// handle worker remove state
// in this state worker is destroyed by supervisor
if w != nil && w.State().Value() == StateRemove {
- err := ww.RemoveWorker(ctx, w)
+ err := ww.RemoveWorker(w)
if err != nil {
return nil, err
}
// try to get next
return ww.GetFreeWorker(ctx)
}
-
// no free stack
if w == nil {
- // TODO allocate timeout
- tout := time.NewTicker(time.Second * 180)
- defer tout.Stop()
for {
select {
default:
@@ -157,8 +153,8 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
ww.ReduceWorkersCount()
return w, nil
- case <-tout.C:
- return nil, errors.E(op, errors.Str("no free workers in the stack"))
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
}
@@ -167,12 +163,12 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
return w, nil
}
-func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
+func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Lock()
const op = errors.Op("allocate new worker")
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, err)
+ return errors.E(op, errors.WorkerAllocate, err)
}
ww.addToWatch(sw)
@@ -187,7 +183,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
return nil
}
-func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
ww.stack.mutex.Lock()
const op = errors.Op("remove worker")
defer ww.stack.mutex.Unlock()
@@ -275,7 +271,7 @@ func (ww *workerWatcher) WorkersList() []WorkerBase {
return workersCopy
}
-func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
+func (ww *workerWatcher) wait(w WorkerBase) {
const op = errors.Op("process wait")
err := w.Wait()
if err != nil {
@@ -300,7 +296,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
ww.ReduceWorkersCount()
ww.stack.mutex.Unlock()
- err = ww.AllocateNew(ctx)
+ err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
@@ -315,7 +311,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
ww.stack.mutex.Unlock()
// worker not in the stack (not returned), forget and allocate new
- err = ww.AllocateNew(ctx)
+ err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
@@ -329,6 +325,6 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
- ww.wait(context.Background(), wb)
+ ww.wait(wb)
}()
}