diff options
author | Valery Piashchynski <[email protected]> | 2020-10-27 15:16:55 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-27 15:16:55 +0300 |
commit | d199ef71b9644afbbba064c317cd0991be1c2443 (patch) | |
tree | f777eb90f10ca0e7dbc46227fc76c61f02111946 /static_pool.go | |
parent | 91cf918b30938129609323ded53e190385e019a6 (diff) |
Supervised pool
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 158 |
1 files changed, 78 insertions, 80 deletions
diff --git a/static_pool.go b/static_pool.go index 4ecbdd41..3af933c3 100755 --- a/static_pool.go +++ b/static_pool.go @@ -6,9 +6,8 @@ import ( "os/exec" "sync" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" - - "github.com/pkg/errors" ) // StopRequest can be sent by worker to indicate that restart is required. @@ -34,9 +33,6 @@ type StaticPool struct { // manages worker states and TTLs ww *workerWatcher - - // supervises memory and TTL of workers - // sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -74,9 +70,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return nil, err } - // todo: implement - // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) - // p.sp.Start() + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } return p, nil } @@ -101,9 +101,10 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { } func (p *StaticPool) Exec(rqs Payload) (Payload, error) { + const op = errors.Op("Exec") w, err := p.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) } else if err != nil { return EmptyPayload, err } @@ -167,76 +168,73 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { return rsp, nil } -// Exec one task with given payload and context, returns result or error. -// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { -// // todo: why TODO passed here? -// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) -// defer cancel() -// w, err := p.ww.GetFreeWorker(getWorkerCtx) -// if err != nil && errors.Is(err, ErrWatcherStopped) { -// return EmptyPayload, ErrWatcherStopped -// } else if err != nil { -// return EmptyPayload, err -// } -// -// sw := w.(SyncWorker) -// -// // todo: implement worker destroy -// //execCtx context.Context -// //if p.cfg.Supervisor.ExecTTL != 0 { -// // var cancel2 context.CancelFunc -// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) -// // defer cancel2() -// //} else { -// // execCtx = ctx -// //} -// -// rsp, err := sw.Exec(rqs) -// if err != nil { -// errJ := p.checkMaxJobs(ctx, w) -// if errJ != nil { -// // todo: worker was not destroyed -// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) -// } -// -// // soft job errors are allowed -// if _, jobError := err.(JobError); jobError { -// p.ww.PushWorker(w) -// return EmptyPayload, err -// } -// -// sw.State().Set(StateInvalid) -// errS := w.Stop(ctx) -// if errS != nil { -// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) -// } -// -// return EmptyPayload, err -// } -// -// // worker want's to be terminated -// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { -// w.State().Set(StateInvalid) -// err = w.Stop(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// return p.ExecWithContext(ctx, rqs) -// } -// -// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { -// err = p.ww.AllocateNew(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// } else { -// p.muw.Lock() -// p.ww.PushWorker(w) -// p.muw.Unlock() -// } -// -// return rsp, nil -// } +func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("Exec") + w, err := p.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) + } else if err != nil { + return EmptyPayload, err + } + + sw := w.(SyncWorker) + + rsp, err := sw.ExecWithContext(ctx, rqs) + if err != nil { + // soft job errors are allowed + if _, jobError := err.(JobError); jobError { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(bCtx) + if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + p.ww.PushWorker(w) + } + + return EmptyPayload, err + } + + sw.State().Set(StateInvalid) + p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + errS := w.Stop(bCtx) + + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + + return p.Exec(rqs) + } + + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err = p.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, err + } + } else { + p.muw.Lock() + p.ww.PushWorker(w) + p.muw.Unlock() + } + return rsp, nil +} // Destroy all underlying stack (but let them to complete the task). func (p *StaticPool) Destroy(ctx context.Context) { |