From 9c51360f9119a4114bdcc21c8e61f0908a3c876d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 18 Jul 2021 11:32:44 +0300 Subject: Started beanstalk driver. Add new Queue impl (not finished yet). Fix bugs in the AMQP, update proto-api Signed-off-by: Valery Piashchynski --- pkg/worker_watcher/worker_watcher.go | 59 ++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 26 deletions(-) (limited to 'pkg/worker_watcher/worker_watcher.go') diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b2d61d48..6e343fff 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -8,45 +8,51 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" + "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" ) // Vector interface represents vector container type Vector interface { - // Enqueue used to put worker to the vector - Enqueue(worker.BaseProcess) - // Dequeue used to get worker from the vector - Dequeue(ctx context.Context) (worker.BaseProcess, error) + // Push used to put worker to the vector + Push(worker.BaseProcess) + // Pop used to get worker from the vector + Pop(ctx context.Context) (worker.BaseProcess, error) + // Remove worker with provided pid + Remove(pid int64) // TODO replace // Destroy used to stop releasing the workers Destroy() } +type workerWatcher struct { + sync.RWMutex + container Vector + // used to control the Destroy stage (that all workers are in the container) + numWorkers uint64 + + workers []worker.BaseProcess + + allocator worker.Allocator + events events.Handler +} + // NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { ww := &workerWatcher{ - container: container.NewVector(numWorkers), + container: channel.NewVector(numWorkers), numWorkers: numWorkers, - workers: make([]worker.BaseProcess, 0, numWorkers), - allocator: allocator, - events: events, + + workers: make([]worker.BaseProcess, 0, numWorkers), + + allocator: allocator, + events: events, } return ww } -type workerWatcher struct { - sync.RWMutex - container Vector - // used to control the Destroy stage (that all workers are in the container) - numWorkers uint64 - workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler -} - func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - ww.container.Enqueue(workers[i]) + ww.container.Push(workers[i]) // add worker to watch slice ww.workers = append(ww.workers, workers[i]) @@ -62,7 +68,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation - w, err := ww.container.Dequeue(ctx) + w, err := ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) } @@ -78,11 +84,11 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { // ========================================================= // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container + _ = w.Kill() + // no free workers in the container or worker not in the ReadyState (TTL-ed) // try to continuously get free one for { - w, err = ww.container.Dequeue(ctx) + w, err = ww.container.Pop(ctx) if errors.Is(errors.WatcherStopped, err) { return nil, errors.E(op, errors.WatcherStopped) @@ -98,7 +104,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { case worker.StateReady: return w, nil case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work + ww.container.Push(w) // put it back, let worker finish the work continue case // all the possible wrong states @@ -162,7 +168,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { func (ww *workerWatcher) Push(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: - ww.container.Enqueue(w) + ww.container.Push(w) default: _ = w.Kill() } @@ -232,6 +238,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { return } + w.State().Set(worker.StateStopped) ww.Remove(w) err = ww.Allocate() if err != nil { -- cgit v1.2.3 From 02fc3664f4ad97e03c8f3a641e7322362f78721c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 19 Jul 2021 09:49:16 +0300 Subject: Worker watcher interface update. Signed-off-by: Valery Piashchynski --- pkg/worker_watcher/worker_watcher.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'pkg/worker_watcher/worker_watcher.go') diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 6e343fff..ca026383 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -18,9 +18,12 @@ type Vector interface { // Pop used to get worker from the vector Pop(ctx context.Context) (worker.BaseProcess, error) // Remove worker with provided pid - Remove(pid int64) // TODO replace + Remove(pid int64) // Destroy used to stop releasing the workers Destroy() + + // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation + // Replace(prevPid int64, newWorker worker.BaseProcess) } type workerWatcher struct { @@ -63,8 +66,8 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// Get is not a thread safe operation -func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { +// Take is not a thread safe operation +func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation @@ -141,7 +144,7 @@ func (ww *workerWatcher) Allocate() error { // unlock Allocate mutex ww.Unlock() // push the worker to the container - ww.Push(sw) + ww.Release(sw) return nil } @@ -164,8 +167,8 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { } } -// Push O(1) operation -func (ww *workerWatcher) Push(w worker.BaseProcess) { +// Release O(1) operation +func (ww *workerWatcher) Release(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: ww.container.Push(w) -- cgit v1.2.3 From c90c11b92e229280477a9b049e65ca1048825dd4 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 1 Aug 2021 19:12:40 +0300 Subject: Rework vec based on the channel. Use select statement with the default branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski --- pkg/worker_watcher/worker_watcher.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'pkg/worker_watcher/worker_watcher.go') diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index ca026383..348be199 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -29,7 +29,7 @@ type Vector interface { type workerWatcher struct { sync.RWMutex container Vector - // used to control the Destroy stage (that all workers are in the container) + // used to control Destroy stage (that all workers are in the container) numWorkers uint64 workers []worker.BaseProcess @@ -235,14 +235,18 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } + // remove worker + ww.Remove(w) + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } + // set state as stopped w.State().Set(worker.StateStopped) - ww.Remove(w) + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ -- cgit v1.2.3