summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-19 09:49:16 +0300
committerValery Piashchynski <[email protected]>2021-07-19 09:49:16 +0300
commit02fc3664f4ad97e03c8f3a641e7322362f78721c (patch)
treec5cccffde3c9d087ac02846c593a63dfe38de98f /pkg/pool/static_pool.go
parent978451159855c155b2aecdca3c86f47e5c6dd1ff (diff)
Worker watcher interface update.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go18
1 files changed, 9 insertions, 9 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index f2f19795..037294ea 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -142,7 +142,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxGetFree, op)
+ w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -163,7 +163,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return rsp, nil
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -176,7 +176,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxAlloc, op)
+ w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
- sp.ww.Push(w)
+ sp.ww.Release(w)
return
}
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// Get function consumes context with timeout
- w, err := sp.ww.Get(ctxGetFree)
+ w, err := sp.ww.Take(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -265,7 +265,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
} else {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
}