summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /pkg/worker_watcher
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go107
-rw-r--r--pkg/worker_watcher/container/queue/queue.go102
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go318
3 files changed, 0 insertions, 527 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
deleted file mode 100644
index 5605f1e0..00000000
--- a/pkg/worker_watcher/container/channel/vec.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package channel
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Vec struct {
- sync.RWMutex
- // destroy signal
- destroy uint64
- // channel with the workers
- workers chan worker.BaseProcess
-}
-
-func NewVector(len uint64) *Vec {
- vec := &Vec{
- destroy: 0,
- workers: make(chan worker.BaseProcess, len),
- }
-
- return vec
-}
-
-// Push is O(1) operation
-// In case of TTL and full channel O(n) worst case, where n is len of the channel
-func (v *Vec) Push(w worker.BaseProcess) {
- // Non-blocking channel send
- select {
- case v.workers <- w:
- // default select branch is only possible when dealing with TTL
- // because in that case, workers in the v.workers channel can be TTL-ed and killed
- // but presenting in the channel
- default:
- // Stop Pop operations
- v.Lock()
- defer v.Unlock()
-
- /*
- we can be in the default branch by the following reasons:
- 1. TTL is set with no requests during the TTL
- 2. Violated Get <-> Release operation (how ??)
- */
- for i := 0; i < len(v.workers); i++ {
- /*
- We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
- */
- wrk := <-v.workers
- switch wrk.State().Value() {
- // skip good states, put worker back
- case worker.StateWorking, worker.StateReady:
- // put the worker back
- // generally, while send and receive operations are concurrent (from the channel), channel behave
- // like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
- v.workers <- wrk
- continue
- /*
- Bad states are here.
- */
- default:
- // kill the current worker (just to be sure it's dead)
- if wrk != nil {
- _ = wrk.Kill()
- }
- // replace with the new one and return from the loop
- // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
- // But this case will be handled in the worker_watcher::Get
- v.workers <- w
- return
- }
- }
- }
-}
-
-func (v *Vec) Remove(_ int64) {}
-
-func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
- /*
- if *addr == old {
- *addr = new
- return true
- }
- */
-
- if atomic.LoadUint64(&v.destroy) == 1 {
- return nil, errors.E(errors.WatcherStopped)
- }
-
- // used only for the TTL-ed workers
- v.RLock()
- defer v.RUnlock()
-
- select {
- case w := <-v.workers:
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
- }
-}
-
-func (v *Vec) Destroy() {
- atomic.StoreUint64(&v.destroy, 1)
-}
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
deleted file mode 100644
index edf81d60..00000000
--- a/pkg/worker_watcher/container/queue/queue.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package queue
-
-import (
- "context"
- "sync"
- "sync/atomic"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-const (
- initialSize = 1
- maxInitialSize = 8
- maxInternalSliceSize = 10
-)
-
-type Node struct {
- w []worker.BaseProcess
- // LL
- n *Node
-}
-
-type Queue struct {
- mu sync.Mutex
-
- head *Node
- tail *Node
-
- curr uint64
- len uint64
-
- sliceSize uint64
-}
-
-func NewQueue() *Queue {
- q := &Queue{
- mu: sync.Mutex{},
- head: nil,
- tail: nil,
- curr: 0,
- len: 0,
- sliceSize: 0,
- }
-
- return q
-}
-
-func (q *Queue) Push(w worker.BaseProcess) {
- q.mu.Lock()
-
- if q.head == nil {
- h := newNode(initialSize)
- q.head = h
- q.tail = h
- q.sliceSize = maxInitialSize
- } else if uint64(len(q.tail.w)) >= atomic.LoadUint64(&q.sliceSize) {
- n := newNode(maxInternalSliceSize)
- q.tail.n = n
- q.tail = n
- q.sliceSize = maxInternalSliceSize
- }
-
- q.tail.w = append(q.tail.w, w)
-
- atomic.AddUint64(&q.len, 1)
-
- q.mu.Unlock()
-}
-
-func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
- q.mu.Lock()
-
- if q.head == nil {
- return nil, nil
- }
-
- w := q.head.w[q.curr]
- q.head.w[q.curr] = nil
- atomic.AddUint64(&q.len, ^uint64(0))
- atomic.AddUint64(&q.curr, 1)
-
- if atomic.LoadUint64(&q.curr) >= uint64(len(q.head.w)) {
- n := q.head.n
- q.head.n = nil
- q.head = n
- q.curr = 0
- }
-
- q.mu.Unlock()
-
- return w, nil
-}
-
-func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
-
-}
-
-func (q *Queue) Destroy() {}
-
-func newNode(capacity int) *Node {
- return &Node{w: make([]worker.BaseProcess, 0, capacity)}
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
deleted file mode 100755
index 83f8e627..00000000
--- a/pkg/worker_watcher/worker_watcher.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package worker_watcher //nolint:stylecheck
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-// Vector interface represents vector container
-type Vector interface {
- // Push used to put worker to the vector
- Push(worker.BaseProcess)
- // Pop used to get worker from the vector
- Pop(ctx context.Context) (worker.BaseProcess, error)
- // Remove worker with provided pid
- Remove(pid int64)
- // Destroy used to stop releasing the workers
- Destroy()
-
- // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
- // Replace(prevPid int64, newWorker worker.BaseProcess)
-}
-
-type workerWatcher struct {
- sync.RWMutex
- container Vector
- // used to control Destroy stage (that all workers are in the container)
- numWorkers *uint64
-
- workers []worker.BaseProcess
-
- allocator worker.Allocator
- allocateTimeout time.Duration
- events events.Handler
-}
-
-// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
- ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
-
- // pass a ptr to the number of workers to avoid blocking in the TTL loop
- numWorkers: utils.Uint64(numWorkers),
- allocateTimeout: allocateTimeout,
- workers: make([]worker.BaseProcess, 0, numWorkers),
-
- allocator: allocator,
- events: events,
- }
-
- return ww
-}
-
-func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
- for i := 0; i < len(workers); i++ {
- ww.container.Push(workers[i])
- // add worker to watch slice
- ww.workers = append(ww.workers, workers[i])
-
- go func(swc worker.BaseProcess) {
- ww.wait(swc)
- }(workers[i])
- }
- return nil
-}
-
-// Take is not a thread safe operation
-func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
- const op = errors.Op("worker_watcher_get_free_worker")
-
- // thread safe operation
- w, err := ww.container.Pop(ctx)
- if err != nil {
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
- return nil, errors.E(op, err)
- }
-
- // fast path, worker not nil and in the ReadyState
- if w.State().Value() == worker.StateReady {
- return w, nil
- }
-
- // =========================================================
- // SLOW PATH
- _ = w.Kill()
- // no free workers in the container or worker not in the ReadyState (TTL-ed)
- // try to continuously get free one
- for {
- w, err = ww.container.Pop(ctx)
- if err != nil {
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
- }
- return nil, errors.E(op, err)
- }
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- return w, nil
- case worker.StateWorking: // how??
- ww.container.Push(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
- }
- }
-}
-
-func (ww *workerWatcher) Allocate() error {
- const op = errors.Op("worker_watcher_allocate_new")
-
- sw, err := ww.allocator()
- if err != nil {
- // log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
-
- // if no timeout, return error immediately
- if ww.allocateTimeout == 0 {
- return errors.E(op, errors.WorkerAllocate, err)
- }
-
- // every half of a second
- allocateFreq := time.NewTicker(time.Millisecond * 500)
-
- tt := time.After(ww.allocateTimeout)
- for {
- select {
- case <-tt:
- // reduce number of workers
- atomic.AddUint64(ww.numWorkers, ^uint64(0))
- allocateFreq.Stop()
- // timeout exceed, worker can't be allocated
- return errors.E(op, errors.WorkerAllocate, err)
-
- case <-allocateFreq.C:
- sw, err = ww.allocator()
- if err != nil {
- // log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
- continue
- }
-
- // reallocated
- allocateFreq.Stop()
- goto done
- }
- }
- }
-
-done:
- // add worker to Wait
- ww.addToWatch(sw)
-
- ww.Lock()
- // add new worker to the workers slice (to get information about workers in parallel)
- ww.workers = append(ww.workers, sw)
- ww.Unlock()
-
- // push the worker to the container
- ww.Release(sw)
- return nil
-}
-
-// Remove worker
-func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
- ww.Lock()
- defer ww.Unlock()
-
- // set remove state
- pid := wb.Pid()
-
- // worker will be removed on the Get operation
- for i := 0; i < len(ww.workers); i++ {
- if ww.workers[i].Pid() == pid {
- ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker, just to be sure it's dead
- _ = wb.Kill()
- return
- }
- }
-}
-
-// Release O(1) operation
-func (ww *workerWatcher) Release(w worker.BaseProcess) {
- switch w.State().Value() {
- case worker.StateReady:
- ww.container.Push(w)
- default:
- _ = w.Kill()
- }
-}
-
-// Destroy all underlying container (but let them complete the task)
-func (ww *workerWatcher) Destroy(_ context.Context) {
- // destroy container, we don't use ww mutex here, since we should be able to push worker
- ww.Lock()
- // do not release new workers
- ww.container.Destroy()
- ww.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 100)
- defer tt.Stop()
- for { //nolint:gosimple
- select {
- case <-tt.C:
- ww.Lock()
- // that might be one of the workers is working
- if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
- ww.Unlock()
- continue
- }
- // All container at this moment are in the container
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- for i := 0; i < len(ww.workers); i++ {
- ww.workers[i].State().Set(worker.StateDestroyed)
- // kill the worker
- _ = ww.workers[i].Kill()
- }
- return
- }
- }
-}
-
-// List - this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) List() []worker.BaseProcess {
- ww.RLock()
- defer ww.RUnlock()
-
- if len(ww.workers) == 0 {
- return nil
- }
-
- base := make([]worker.BaseProcess, 0, len(ww.workers))
- for i := 0; i < len(ww.workers); i++ {
- base = append(base, ww.workers[i])
- }
-
- return base
-}
-
-func (ww *workerWatcher) wait(w worker.BaseProcess) {
- const op = errors.Op("worker_watcher_wait")
- err := w.Wait()
- if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerError,
- Worker: w,
- Payload: errors.E(op, err),
- })
- }
-
- // remove worker
- ww.Remove(w)
-
- if w.State().Value() == worker.StateDestroyed {
- // worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- return
- }
-
- // set state as stopped
- w.State().Set(worker.StateStopped)
-
- err = ww.Allocate()
- if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventPoolError,
- Payload: errors.E(op, err),
- })
-
- // no workers at all, panic
- if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
- panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
- }
- }
-}
-
-func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
- go func() {
- ww.wait(wb)
- }()
-}