diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 5 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 3 | ||||
-rwxr-xr-x | pkg/worker/state.go | 5 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 9 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go) | 16 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go) | 2 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 45 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 10 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 240 |
9 files changed, 226 insertions, 109 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index bb68151f..f1b20bb9 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -130,7 +130,8 @@ func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { } func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { - return sp.ww.Remove(wb) + sp.ww.Remove(wb) + return nil } // Be careful, sync Exec with ExecWithContext @@ -208,6 +209,8 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error { const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + w.State().Set(worker.StateDestroyed) + sp.ww.Remove(w) err := sp.ww.Allocate() if err != nil { return errors.E(op, err) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 4c1c90e5..44f5936c 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -148,8 +148,6 @@ func Test_StaticPool_JobError(t *testing.T) { cfg, ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) res, err := p.Exec(payload.Payload{Body: []byte("hello")}) @@ -163,6 +161,7 @@ func Test_StaticPool_JobError(t *testing.T) { } assert.Contains(t, err.Error(), "hello") + p.Destroy(ctx) } func Test_StaticPool_Broken_Replace(t *testing.T) { diff --git a/pkg/worker/state.go b/pkg/worker/state.go index c5d70a21..176e151b 100755 --- a/pkg/worker/state.go +++ b/pkg/worker/state.go @@ -32,9 +32,6 @@ const ( // StateErrored - error StateImpl (can't be used). StateErrored - - // StateRemove - worker is killed and removed from the stack - StateRemove ) type StateImpl struct { @@ -70,8 +67,6 @@ func (s *StateImpl) String() string { return "errored" case StateDestroyed: return "destroyed" - case StateRemove: - return "remove" } return "undefined" diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go new file mode 100644 index 00000000..bb66897f --- /dev/null +++ b/pkg/worker_watcher/container/interface.go @@ -0,0 +1,9 @@ +package container + +import "github.com/spiral/roadrunner/v2/pkg/worker" + +type Vector interface { + Enqueue(worker.BaseProcess) + Dequeue() (worker.BaseProcess, bool) + Destroy() +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go index 69e2024b..91c71b24 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/container/stack.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package container //nolint:golint,stylecheck import ( "context" "runtime" @@ -32,8 +32,8 @@ func (stack *Stack) Reset() { stack.workers = nil } -// Push worker back to the stack -// If stack in destroy state, Push will provide 100ms window to unlock the mutex +// Push worker back to the vec +// If vec in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { stack.Lock() defer stack.Unlock() @@ -51,7 +51,7 @@ func (stack *Stack) Pop() (worker.BaseProcess, bool) { stack.Lock() defer stack.Unlock() - // do not release new stack + // do not release new vec if stack.destroy { return nil, true } @@ -71,7 +71,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { stack.Lock() defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { - // worker in the stack, reallocating + // worker in the vec, reallocating if stack.workers[i].Pid() == pid { stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) stack.actualNumOfWorkers-- @@ -83,7 +83,7 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { return false } -// Workers return copy of the workers in the stack +// Workers return copy of the workers in the vec func (stack *Stack) Workers() []worker.BaseProcess { stack.Lock() defer stack.Unlock() @@ -124,11 +124,11 @@ func (stack *Stack) Destroy(_ context.Context) { } stack.Unlock() // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack + // just to make sure. All vec at this moment are in the vec // Pop operation is blocked, push can't be done, since it's not possible to pop stack.Lock() for i := 0; i < len(stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) + // set state for the vec in the vec (unused at the moment) stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go index 769419e4..59c15773 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/container/stack_test.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package container //nolint:golint,stylecheck import ( "context" "os/exec" diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go new file mode 100644 index 00000000..da9822ef --- /dev/null +++ b/pkg/worker_watcher/container/vec.go @@ -0,0 +1,45 @@ +package container //nolint:golint,stylecheck + +import ( + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + destroy uint64 + workers chan worker.BaseProcess +} + +func NewVector(initialNumOfWorkers uint64) Vector { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, initialNumOfWorkers), + } + + return vec +} + +func (v *Vec) Enqueue(w worker.BaseProcess) { + v.workers <- w +} + +func (v *Vec) Dequeue() (worker.BaseProcess, bool) { + /* + if *addr == old { + *addr = new + return true + } + */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, true + } + + w := <-v.workers + + return w, false +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index a3552e7e..4625b7a7 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -8,7 +8,7 @@ import ( // Watcher is an interface for the Sync workers lifecycle type Watcher interface { - // Watch used to add workers to the stack + // Watch used to add workers to the container Watch(workers []worker.BaseProcess) error // Get provide first free worker @@ -20,12 +20,12 @@ type Watcher interface { // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error - // Destroy destroys the underlying stack + // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all stack w/o removing it from internal storage + // WorkersList return all container w/o removing it from internal storage List() []worker.BaseProcess - // RemoveWorker remove worker from the stack - Remove(wb worker.BaseProcess) error + // RemoveWorker remove worker from the container + Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 2380c190..3e0633a3 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,33 +3,42 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" "sync" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +// workerCreateFunc can be nil, but in that case, dead container will not be replaced func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(numWorkers), - allocator: allocator, - events: events, + container: container.NewVector(numWorkers), + numWorkers: numWorkers, + workers: make([]worker.BaseProcess, 0, numWorkers), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - events events.Handler + sync.RWMutex + container container.Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + workers []worker.BaseProcess + allocator worker.Allocator + events events.Handler } func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.stack.Push(workers[i]) + ww.container.Enqueue(workers[i]) + // add worker to watch slice + ww.workers = append(ww.workers, workers[i]) go func(swc worker.BaseProcess) { ww.wait(swc) @@ -38,75 +47,96 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } +// return value from Get +type get struct { + w worker.BaseProcess + err error +} + // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { + c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - // FAST PATH - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - - // fast path, worker not nil and in the ReadyState - if w != nil && w.State().Value() == worker.StateReady { - return w, nil - } - // ========================================================= - // SLOW PATH - // Put worker back (no matter it's state, it will be killed next) - if w != nil { - ww.stack.Push(w) - } - // no free workers in the stack - // try to continuously get free one - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) + go func() { + // FAST PATH + // thread safe operation + w, stop := ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } - if w == nil { - continue + return + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + c <- get{ + w, + nil, } + return + } + // ========================================================= + // SLOW PATH + _ = w.Kill() + // no free workers in the container + // try to continuously get free one + for { + select { + default: + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), + } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, errors.E(op, err) + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, + } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } - // try to get next - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue } - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + }() + + select { + case r := <-c: + if r.err != nil { + return nil, r.err + } + return r.w, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } func (ww *workerWatcher) Allocate() error { - ww.mutex.Lock() + ww.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { @@ -114,47 +144,83 @@ func (ww *workerWatcher) Allocate() error { } ww.addToWatch(sw) - ww.mutex.Unlock() - ww.Push(sw) + ww.workers = append(ww.workers, sw) + + ww.Unlock() + ww.Push(sw) return nil } // Remove -func (ww *workerWatcher) Remove(wb worker.BaseProcess) error { - ww.mutex.Lock() - defer ww.mutex.Unlock() +func (ww *workerWatcher) Remove(wb worker.BaseProcess) { + ww.Lock() + defer ww.Unlock() - const op = errors.Op("worker_watcher_remove_worker") // set remove state - wb.State().Set(worker.StateRemove) - if ww.stack.FindAndRemoveByPid(wb.Pid()) { - err := wb.Kill() - if err != nil { - return errors.E(op, err) + pid := wb.Pid() + + // worker will be removed on the Get operation + for i := 0; i < len(ww.workers); i++ { + if ww.workers[i].Pid() == pid { + ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) + // kill worker + _ = wb.Kill() + return } - return nil } - - return nil } // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - ww.stack.Push(w) + ww.container.Enqueue(w) } -// Destroy all underlying stack (but let them to complete the task) +// Destroy all underlying container (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - // destroy stack, we don't use ww mutex here, since we should be able to push worker - ww.stack.Destroy(ctx) + // destroy container, we don't use ww mutex here, since we should be able to push worker + ww.Lock() + // do not release new workers + ww.container.Destroy() + ww.Unlock() + + tt := time.NewTicker(time.Millisecond * 500) + defer tt.Stop() + for { + select { + case <-tt.C: + ww.Lock() + // that might be one of the workers is working + if ww.numWorkers != uint64(len(ww.workers)) { + ww.Unlock() + continue + } + ww.Unlock() + // unnecessary mutex, but + // just to make sure. All container at this moment are in the container + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.Lock() + for i := 0; i < len(ww.workers); i++ { + ww.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = ww.workers[i].Kill() + } + return + } + } } // Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { - return ww.stack.Workers() + ww.Lock() + defer ww.Unlock() + + base := make([]worker.BaseProcess, 0, len(ww.workers)) + for i := 0; i < len(ww.workers); i++ { + base = append(base, ww.workers[i]) + } + + return base } func (ww *workerWatcher) wait(w worker.BaseProcess) { @@ -174,7 +240,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } - _ = ww.stack.FindAndRemoveByPid(w.Pid()) + ww.Remove(w) err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ |