diff options
author | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
commit | 77670fb7af0c892c9b3a589fd424534fad288e7a (patch) | |
tree | 3adcaa85db664a355abe2b28f1d7e4a3fc45689f /static_pool.go | |
parent | 16fbf3104c3c34bd9355593052b686acd26a8efe (diff) |
Update according activity worker
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 77 |
1 files changed, 69 insertions, 8 deletions
diff --git a/static_pool.go b/static_pool.go index 2e72864d..bc990da5 100644 --- a/static_pool.go +++ b/static_pool.go @@ -14,6 +14,8 @@ const ( StopRequest = "{\"stop\":true}" ) +var bCtx = context.Background() + // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { // pool behaviour @@ -40,9 +42,7 @@ type PoolEvent struct { // supervisor Supervisor, todo: think about it // stack func() (WorkerBase, error), func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { - if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config") - } + cfg.InitDefaults() p := &StaticPool{ cfg: cfg, @@ -92,8 +92,63 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { return p.ww.RemoveWorker(ctx, wb) } +func (p *StaticPool) Exec(rqs Payload) (Payload, error) { + w, err := p.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(err, ErrWatcherStopped) { + return EmptyPayload, ErrWatcherStopped + } else if err != nil { + return EmptyPayload, err + } + + sw := w.(SyncWorker) + + rsp, err := sw.Exec(rqs) + if err != nil { + errJ := p.checkMaxJobs(bCtx, w) + if errJ != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) + } + // soft job errors are allowed + if _, jobError := err.(TaskError); jobError { + p.ww.PushWorker(w) + return EmptyPayload, err + } + + sw.State().Set(StateInvalid) + errS := w.Stop(bCtx) + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + return EmptyPayload, err + } + return p.ExecWithContext(bCtx, rqs) + } + + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err = p.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, err + } + } else { + p.muw.Lock() + p.ww.PushWorker(w) + p.muw.Unlock() + } + return rsp, nil +} + // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { +func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + // todo: why TODO passed here? getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) defer cancel() w, err := p.ww.GetFreeWorker(getWorkerCtx) @@ -105,10 +160,16 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { sw := w.(SyncWorker) - execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() + var execCtx context.Context + if p.cfg.ExecTTL != 0 { + var cancel2 context.CancelFunc + execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) + defer cancel2() + } else { + execCtx = ctx + } - rsp, err := sw.Exec(execCtx, rqs) + rsp, err := sw.ExecWithContext(execCtx, rqs) if err != nil { errJ := p.checkMaxJobs(ctx, w) if errJ != nil { @@ -136,7 +197,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { if err != nil { return EmptyPayload, err } - return p.Exec(ctx, rqs) + return p.ExecWithContext(ctx, rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { |