summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
committerValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
commit77670fb7af0c892c9b3a589fd424534fad288e7a (patch)
tree3adcaa85db664a355abe2b28f1d7e4a3fc45689f /static_pool.go
parent16fbf3104c3c34bd9355593052b686acd26a8efe (diff)
Update according activity worker
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go77
1 files changed, 69 insertions, 8 deletions
diff --git a/static_pool.go b/static_pool.go
index 2e72864d..bc990da5 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -14,6 +14,8 @@ const (
StopRequest = "{\"stop\":true}"
)
+var bCtx = context.Background()
+
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
// pool behaviour
@@ -40,9 +42,7 @@ type PoolEvent struct {
// supervisor Supervisor, todo: think about it
// stack func() (WorkerBase, error),
func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config")
- }
+ cfg.InitDefaults()
p := &StaticPool{
cfg: cfg,
@@ -92,8 +92,63 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
return p.ww.RemoveWorker(ctx, wb)
}
+func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
+ w, err := p.ww.GetFreeWorker(context.Background())
+ if err != nil && errors.Is(err, ErrWatcherStopped) {
+ return EmptyPayload, ErrWatcherStopped
+ } else if err != nil {
+ return EmptyPayload, err
+ }
+
+ sw := w.(SyncWorker)
+
+ rsp, err := sw.Exec(rqs)
+ if err != nil {
+ errJ := p.checkMaxJobs(bCtx, w)
+ if errJ != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+ }
+ // soft job errors are allowed
+ if _, jobError := err.(TaskError); jobError {
+ p.ww.PushWorker(w)
+ return EmptyPayload, err
+ }
+
+ sw.State().Set(StateInvalid)
+ errS := w.Stop(bCtx)
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ return p.ExecWithContext(bCtx, rqs)
+ }
+
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err = p.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ p.muw.Lock()
+ p.ww.PushWorker(w)
+ p.muw.Unlock()
+ }
+ return rsp, nil
+}
+
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
+func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ // todo: why TODO passed here?
getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
defer cancel()
w, err := p.ww.GetFreeWorker(getWorkerCtx)
@@ -105,10 +160,16 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
sw := w.(SyncWorker)
- execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
- defer cancel2()
+ var execCtx context.Context
+ if p.cfg.ExecTTL != 0 {
+ var cancel2 context.CancelFunc
+ execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
+ defer cancel2()
+ } else {
+ execCtx = ctx
+ }
- rsp, err := sw.Exec(execCtx, rqs)
+ rsp, err := sw.ExecWithContext(execCtx, rqs)
if err != nil {
errJ := p.checkMaxJobs(ctx, w)
if errJ != nil {
@@ -136,7 +197,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
if err != nil {
return EmptyPayload, err
}
- return p.Exec(ctx, rqs)
+ return p.ExecWithContext(ctx, rqs)
}
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {