diff options
author | Valery Piashchynski <[email protected]> | 2021-07-19 09:49:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-19 09:49:16 +0300 |
commit | 02fc3664f4ad97e03c8f3a641e7322362f78721c (patch) | |
tree | c5cccffde3c9d087ac02846c593a63dfe38de98f /pkg | |
parent | 978451159855c155b2aecdca3c86f47e5c6dd1ff (diff) |
Worker watcher interface update.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/interface.go | 8 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 18 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 6 | ||||
-rw-r--r-- | pkg/worker_watcher/container/queue/queue.go | 4 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 15 |
5 files changed, 28 insertions, 23 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index bbf7653e..a040d27a 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -33,11 +33,11 @@ type Watcher interface { // Watch used to add workers to the container Watch(workers []worker.BaseProcess) error - // Get provide first free worker - Get(ctx context.Context) (worker.BaseProcess, error) + // Take takes the first free worker + Take(ctx context.Context) (worker.BaseProcess, error) - // Push enqueues worker back - Push(w worker.BaseProcess) + // Release releases the worker putting it back to the queue + Release(w worker.BaseProcess) // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index f2f19795..037294ea 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -142,7 +142,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.takeWorker(ctxGetFree, op) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -163,7 +163,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return rsp, nil } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -176,7 +176,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxAlloc, op) + w, err := sp.takeWorker(ctxAlloc, op) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) - sp.ww.Push(w) + sp.ww.Release(w) return } - sp.ww.Push(w) + sp.ww.Release(w) } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // Get function consumes context with timeout - w, err := sp.ww.Get(ctxGetFree) + w, err := sp.ww.Take(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -265,7 +265,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } else { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Push(w) + sp.ww.Release(w) } } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 4b990dbe..aa6c7cfa 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go index 4f611bbe..edf81d60 100644 --- a/pkg/worker_watcher/container/queue/queue.go +++ b/pkg/worker_watcher/container/queue/queue.go @@ -91,7 +91,9 @@ func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) { return w, nil } -func (q *Queue) Remove(_ int64) {} +func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) { + +} func (q *Queue) Destroy() {} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 6e343fff..ca026383 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -18,9 +18,12 @@ type Vector interface { // Pop used to get worker from the vector Pop(ctx context.Context) (worker.BaseProcess, error) // Remove worker with provided pid - Remove(pid int64) // TODO replace + Remove(pid int64) // Destroy used to stop releasing the workers Destroy() + + // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation + // Replace(prevPid int64, newWorker worker.BaseProcess) } type workerWatcher struct { @@ -63,8 +66,8 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// Get is not a thread safe operation -func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { +// Take is not a thread safe operation +func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation @@ -141,7 +144,7 @@ func (ww *workerWatcher) Allocate() error { // unlock Allocate mutex ww.Unlock() // push the worker to the container - ww.Push(sw) + ww.Release(sw) return nil } @@ -164,8 +167,8 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { } } -// Push O(1) operation -func (ww *workerWatcher) Push(w worker.BaseProcess) { +// Release O(1) operation +func (ww *workerWatcher) Release(w worker.BaseProcess) { switch w.State().Value() { case worker.StateReady: ww.container.Push(w) |