From 02fc3664f4ad97e03c8f3a641e7322362f78721c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 19 Jul 2021 09:49:16 +0300 Subject: Worker watcher interface update. Signed-off-by: Valery Piashchynski --- pkg/pool/static_pool.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'pkg/pool/static_pool.go') diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index f2f19795..037294ea 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -142,7 +142,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.takeWorker(ctxGetFree, op) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -163,7 +163,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return rsp, nil } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -176,7 +176,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxAlloc, op) + w, err := sp.takeWorker(ctxAlloc, op) if err != nil { return payload.Payload{}, errors.E(op, err) } @@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) - sp.ww.Push(w) + sp.ww.Release(w) return } - sp.ww.Push(w) + sp.ww.Release(w) } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // Get function consumes context with timeout - w, err := sp.ww.Get(ctxGetFree) + w, err := sp.ww.Take(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -265,7 +265,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } else { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Push(w) + sp.ww.Release(w) } } -- cgit v1.2.3