summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-26 00:10:45 +0300
committerValery Piashchynski <[email protected]>2021-06-26 00:10:45 +0300
commit7c0b63a9d0cee75e8bdedd2dcd0c20139f3ffd3f (patch)
treedbc09a665fb75d51f4b01a83cd80b7f2ddab0aae /pkg
parentae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (diff)
- Better channel handling
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/worker_watcher/container/interface.go4
-rw-r--r--pkg/worker_watcher/container/vec.go21
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go105
3 files changed, 64 insertions, 66 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
index 0e648d34..e10ecdae 100644
--- a/pkg/worker_watcher/container/interface.go
+++ b/pkg/worker_watcher/container/interface.go
@@ -1,6 +1,8 @@
package container
import (
+ "context"
+
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -9,7 +11,7 @@ type Vector interface {
// Enqueue used to put worker to the vector
Enqueue(worker.BaseProcess)
// Dequeue used to get worker from the vector
- Dequeue() (worker.BaseProcess, bool)
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
// Destroy used to stop releasing the workers
Destroy()
}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 1ab9d073..8072af10 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -1,20 +1,20 @@
package container
import (
+ "context"
"sync/atomic"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Vec struct {
- wqLen uint64
destroy uint64
workers chan worker.BaseProcess
}
func NewVector(initialNumOfWorkers uint64) Vector {
vec := &Vec{
- wqLen: 0,
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
}
@@ -23,11 +23,10 @@ func NewVector(initialNumOfWorkers uint64) Vector {
}
func (v *Vec) Enqueue(w worker.BaseProcess) {
- atomic.AddUint64(&v.wqLen, 1)
v.workers <- w
}
-func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
@@ -36,15 +35,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
*/
if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, true
+ return nil, errors.E(errors.WatcherStopped)
}
- if num := atomic.LoadUint64(&v.wqLen); num > 0 {
- atomic.AddUint64(&v.wqLen, ^uint64(0))
- return <-v.workers, false
+ for {
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
}
-
- return nil, false
}
func (v *Vec) Destroy() {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 9d66a75c..f82de958 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -51,64 +51,59 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
+ // thread safe operation
+ w, err := ww.container.Dequeue(ctx)
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ return w, nil
+ }
+
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill() // how the worker get here???????
+ // no free workers in the container
+ // try to continuously get free one
for {
- select {
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers)
- default:
- // thread safe operation
- w, stop := ww.container.Dequeue()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
+ w, err = ww.container.Dequeue(ctx)
- if w == nil {
- continue
- }
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
- // fast path, worker not nil and in the ReadyState
- if w.State().Value() == worker.StateReady {
- return w, nil
- }
- // =========================================================
- // SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
- // try to continuously get free one
- for {
- w, stop = ww.container.Dequeue()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
- if w == nil {
- continue
- }
-
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- return w, nil
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
- }
- }
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
}
}