summaryrefslogtreecommitdiff
path: root/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-xworker_watcher.go59
1 files changed, 32 insertions, 27 deletions
diff --git a/worker_watcher.go b/worker_watcher.go
index 25c88a1a..0eb8152b 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -2,14 +2,14 @@ package roadrunner
import (
"context"
- "errors"
"sync"
"time"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
)
-var ErrWatcherStopped = errors.New("watcher stopped")
+//var = errors.New("watcher stopped")
type Stack struct {
workers []WorkerBase
@@ -85,11 +85,7 @@ type WorkerWatcher interface {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func newWorkerWatcher(
- allocator func(args ...interface{}) (WorkerBase, error),
- numWorkers int64,
- events *util.EventHandler,
-) *workerWatcher {
+func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -127,10 +123,11 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+ const op = errors.Op("get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
// handle worker remove state
@@ -146,6 +143,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// no free stack
if w == nil {
+ // TODO allocate timeout
tout := time.NewTicker(time.Second * 180)
defer tout.Stop()
for {
@@ -153,20 +151,20 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
default:
w, stop = ww.stack.Pop()
if stop {
- return nil, ErrWatcherStopped
+ return nil, errors.E(op, errors.ErrWatcherStopped)
}
if w == nil {
continue
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
case <-tout.C:
- return nil, errors.New("no free stack")
+ return nil, errors.Str("no free stack")
}
}
}
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
return w, nil
}
@@ -198,10 +196,10 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// found in the stack
// remove worker
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
wb.State().Set(StateInvalid)
- err := wb.Kill(ctx)
+ err := wb.Kill()
if err != nil {
return err
}
@@ -215,14 +213,19 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ ww.IncreaseWorkersCount()
ww.stack.Push(w)
}
func (ww *workerWatcher) ReduceWorkersCount() {
- ww.decreaseNumOfActualWorkers()
+ ww.mutex.Lock()
+ ww.actualNumWorkers--
+ ww.mutex.Unlock()
+}
+func (ww *workerWatcher) IncreaseWorkersCount() {
+ ww.mutex.Lock()
+ ww.actualNumWorkers++
+ ww.mutex.Unlock()
}
// Destroy all underlying stack (but let them to complete the task)
@@ -258,9 +261,17 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation
+// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) WorkersList() []WorkerBase {
- return ww.stack.workers
+ ww.stack.mutex.Lock()
+ defer ww.stack.mutex.Unlock()
+ workersCopy := make([]WorkerBase, 0, 1)
+ for _, v := range ww.stack.workers {
+ sw := v.(SyncWorker)
+ workersCopy = append(workersCopy, sw)
+ }
+
+ return workersCopy
}
func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
@@ -284,7 +295,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
+ ww.ReduceWorkersCount()
ww.stack.mutex.Unlock()
err = ww.AllocateNew(ctx)
@@ -321,9 +332,3 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.wait(context.Background(), wb)
}()
}
-
-func (ww *workerWatcher) decreaseNumOfActualWorkers() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}