summaryrefslogtreecommitdiff
path: root/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /worker_watcher/worker_watcher.go
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'worker_watcher/worker_watcher.go')
-rwxr-xr-xworker_watcher/worker_watcher.go318
1 files changed, 318 insertions, 0 deletions
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
new file mode 100755
index 00000000..78bae778
--- /dev/null
+++ b/worker_watcher/worker_watcher.go
@@ -0,0 +1,318 @@
+package worker_watcher //nolint:stylecheck
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/spiral/roadrunner/v2/worker"
+ "github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
+)
+
+// Vector interface represents vector container
+type Vector interface {
+ // Push used to put worker to the vector
+ Push(worker.BaseProcess)
+ // Pop used to get worker from the vector
+ Pop(ctx context.Context) (worker.BaseProcess, error)
+ // Remove worker with provided pid
+ Remove(pid int64)
+ // Destroy used to stop releasing the workers
+ Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
+}
+
+type workerWatcher struct {
+ sync.RWMutex
+ container Vector
+ // used to control Destroy stage (that all workers are in the container)
+ numWorkers *uint64
+
+ workers []worker.BaseProcess
+
+ allocator worker.Allocator
+ allocateTimeout time.Duration
+ events events.Handler
+}
+
+// NewSyncWorkerWatcher is a constructor for the Watcher
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
+ ww := &workerWatcher{
+ container: channel.NewVector(numWorkers),
+
+ // pass a ptr to the number of workers to avoid blocking in the TTL loop
+ numWorkers: utils.Uint64(numWorkers),
+ allocateTimeout: allocateTimeout,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+
+ allocator: allocator,
+ events: events,
+ }
+
+ return ww
+}
+
+func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
+ for i := 0; i < len(workers); i++ {
+ ww.container.Push(workers[i])
+ // add worker to watch slice
+ ww.workers = append(ww.workers, workers[i])
+
+ go func(swc worker.BaseProcess) {
+ ww.wait(swc)
+ }(workers[i])
+ }
+ return nil
+}
+
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
+ const op = errors.Op("worker_watcher_get_free_worker")
+
+ // thread safe operation
+ w, err := ww.container.Pop(ctx)
+ if err != nil {
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
+ return nil, errors.E(op, err)
+ }
+
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ return w, nil
+ }
+
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill()
+ // no free workers in the container or worker not in the ReadyState (TTL-ed)
+ // try to continuously get free one
+ for {
+ w, err = ww.container.Pop(ctx)
+ if err != nil {
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ return nil, errors.E(op, err)
+ }
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
+ case worker.StateWorking: // how??
+ ww.container.Push(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
+ }
+ }
+}
+
+func (ww *workerWatcher) Allocate() error {
+ const op = errors.Op("worker_watcher_allocate_new")
+
+ sw, err := ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
+ })
+
+ // if no timeout, return error immediately
+ if ww.allocateTimeout == 0 {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ // every half of a second
+ allocateFreq := time.NewTicker(time.Millisecond * 500)
+
+ tt := time.After(ww.allocateTimeout)
+ for {
+ select {
+ case <-tt:
+ // reduce number of workers
+ atomic.AddUint64(ww.numWorkers, ^uint64(0))
+ allocateFreq.Stop()
+ // timeout exceed, worker can't be allocated
+ return errors.E(op, errors.WorkerAllocate, err)
+
+ case <-allocateFreq.C:
+ sw, err = ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
+ })
+ continue
+ }
+
+ // reallocated
+ allocateFreq.Stop()
+ goto done
+ }
+ }
+ }
+
+done:
+ // add worker to Wait
+ ww.addToWatch(sw)
+
+ ww.Lock()
+ // add new worker to the workers slice (to get information about workers in parallel)
+ ww.workers = append(ww.workers, sw)
+ ww.Unlock()
+
+ // push the worker to the container
+ ww.Release(sw)
+ return nil
+}
+
+// Remove worker
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
+ ww.Lock()
+ defer ww.Unlock()
+
+ // set remove state
+ pid := wb.Pid()
+
+ // worker will be removed on the Get operation
+ for i := 0; i < len(ww.workers); i++ {
+ if ww.workers[i].Pid() == pid {
+ ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
+ // kill worker, just to be sure it's dead
+ _ = wb.Kill()
+ return
+ }
+ }
+}
+
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
+ switch w.State().Value() {
+ case worker.StateReady:
+ ww.container.Push(w)
+ default:
+ _ = w.Kill()
+ }
+}
+
+// Destroy all underlying container (but let them complete the task)
+func (ww *workerWatcher) Destroy(_ context.Context) {
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ ww.Lock()
+ // that might be one of the workers is working
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
+ ww.Unlock()
+ continue
+ }
+ // All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+ return
+ }
+ }
+}
+
+// List - this is O(n) operation, and it will return copy of the actual workers
+func (ww *workerWatcher) List() []worker.BaseProcess {
+ ww.RLock()
+ defer ww.RUnlock()
+
+ if len(ww.workers) == 0 {
+ return nil
+ }
+
+ base := make([]worker.BaseProcess, 0, len(ww.workers))
+ for i := 0; i < len(ww.workers); i++ {
+ base = append(base, ww.workers[i])
+ }
+
+ return base
+}
+
+func (ww *workerWatcher) wait(w worker.BaseProcess) {
+ const op = errors.Op("worker_watcher_wait")
+ err := w.Wait()
+ if err != nil {
+ ww.events.Push(events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Worker: w,
+ Payload: errors.E(op, err),
+ })
+ }
+
+ // remove worker
+ ww.Remove(w)
+
+ if w.State().Value() == worker.StateDestroyed {
+ // worker was manually destroyed, no need to replace
+ ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ return
+ }
+
+ // set state as stopped
+ w.State().Set(worker.StateStopped)
+
+ err = ww.Allocate()
+ if err != nil {
+ ww.events.Push(events.PoolEvent{
+ Event: events.EventWorkerProcessExit,
+ Error: errors.E(op, err),
+ })
+
+ // no workers at all, panic
+ if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ }
+ }
+}
+
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
+ go func() {
+ ww.wait(wb)
+ }()
+}