summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-27 00:35:15 +0300
committerValery Piashchynski <[email protected]>2020-11-27 00:35:15 +0300
commitfa9a7e319b5ac6d98fd18d1e4986de35fde254e5 (patch)
tree559908f3a491a15bb4926f79dbfde350ec7d4c40 /static_pool.go
parent46ae5dcc22d971b0f909bce23ec8fdef26811ed6 (diff)
Add new pool event: EventNoFreeWorkers which indicates than RR can't get
worker from the stack during the allowed allocate timeout.
Diffstat (limited to 'static_pool.go')
-rwxr-xr-xstatic_pool.go59
1 files changed, 34 insertions, 25 deletions
diff --git a/static_pool.go b/static_pool.go
index d5511018..b626a499 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -82,7 +82,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo
}
// put stack in the pool
- err = p.ww.AddToWatch(ctx, workers)
+ err = p.ww.AddToWatch(workers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -132,16 +132,18 @@ func (sp *StaticPool) Workers() (workers []WorkerBase) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
- return sp.ww.RemoveWorker(ctx, wb)
+func (sp *StaticPool) RemoveWorker(wb WorkerBase) error {
+ return sp.ww.RemoveWorker(wb)
}
func (sp *StaticPool) Exec(p Payload) (Payload, error) {
- const op = errors.Op("Exec")
+ const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
- w, err := sp.ww.GetFreeWorker(context.Background())
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -171,7 +173,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -189,14 +191,17 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- const op = errors.Op("Exec with context")
- w, err := sp.ww.GetFreeWorker(context.Background())
+ const op = errors.Op("exec with context")
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
return EmptyPayload, errors.E(op, err)
}
sw := w.(SyncWorker)
+ // apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
rqs = sp.before[i](rqs)
@@ -220,7 +225,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
}
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -228,6 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
sp.ww.PushWorker(sw)
}
+ // apply all after functions
if len(sp.after) > 0 {
for i := 0; i < len(sp.after); i++ {
rsp = sp.after[i](rqs, rsp)
@@ -237,6 +243,21 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
return rsp, nil
}
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
// Destroy all underlying stack (but let them to complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
@@ -246,11 +267,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w WorkerBase) (Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
- if errors.Is(errors.Exec, err) {
+ if errors.Is(errors.ErrSoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)})
+ sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)})
}
w.State().Set(StateInvalid)
@@ -318,22 +339,10 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return nil, errors.E(op, errors.WorkerAllocate, err)
}
workers = append(workers, w)
cancel()
}
return workers, nil
}
-
-func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
- const op = errors.Op("check max jobs")
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(ctx)
- if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
- return errors.E(op, err)
- }
- }
- return nil
-}