diff options
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 249 |
1 files changed, 131 insertions, 118 deletions
diff --git a/static_pool.go b/static_pool.go index 4ecbdd41..be7ad6e3 100755 --- a/static_pool.go +++ b/static_pool.go @@ -4,11 +4,9 @@ import ( "context" "fmt" "os/exec" - "sync" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" - - "github.com/pkg/errors" ) // StopRequest can be sent by worker to indicate that restart is required. @@ -29,20 +27,19 @@ type StaticPool struct { // distributes the events events *util.EventHandler - // protects state of worker list, does not affect allocation - muw sync.RWMutex - // manages worker states and TTLs ww *workerWatcher - - // supervises memory and TTL of workers - // sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 + } + p := &StaticPool{ cfg: cfg, cmd: cmd, @@ -74,66 +71,74 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return nil, err } - // todo: implement - // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) - // p.sp.Start() + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } return p, nil } // AddListener connects event listener to the pool. -func (p *StaticPool) AddListener(listener util.EventListener) { - p.events.AddListener(listener) +func (sp *StaticPool) AddListener(listener util.EventListener) { + sp.events.AddListener(listener) } // Config returns associated pool configuration. Immutable. -func (p *StaticPool) GetConfig() Config { - return p.cfg +func (sp *StaticPool) GetConfig() Config { + return sp.cfg } // Workers returns worker list associated with the pool. -func (p *StaticPool) Workers() (workers []WorkerBase) { - return p.ww.WorkersList() +func (sp *StaticPool) Workers() (workers []WorkerBase) { + return sp.ww.WorkersList() } -func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { - return p.ww.RemoveWorker(ctx, wb) +func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { + return sp.ww.RemoveWorker(ctx, wb) } -func (p *StaticPool) Exec(rqs Payload) (Payload, error) { - w, err := p.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped +func (sp *StaticPool) Exec(p Payload) (Payload, error) { + const op = errors.Op("Exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + w, err := sp.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) } else if err != nil { return EmptyPayload, err } sw := w.(SyncWorker) - rsp, err := sw.Exec(rqs) + rsp, err := sw.Exec(p) if err != nil { // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(bCtx) + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } } else { - p.ww.PushWorker(w) + sp.ww.PushWorker(w) } return EmptyPayload, err } sw.State().Set(StateInvalid) - p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -148,109 +153,117 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.Exec(rqs) + return sp.Exec(p) } - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(bCtx) + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) if err != nil { return EmptyPayload, err } } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() + sp.ww.PushWorker(w) } return rsp, nil } -// Exec one task with given payload and context, returns result or error. -// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { -// // todo: why TODO passed here? -// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) -// defer cancel() -// w, err := p.ww.GetFreeWorker(getWorkerCtx) -// if err != nil && errors.Is(err, ErrWatcherStopped) { -// return EmptyPayload, ErrWatcherStopped -// } else if err != nil { -// return EmptyPayload, err -// } -// -// sw := w.(SyncWorker) -// -// // todo: implement worker destroy -// //execCtx context.Context -// //if p.cfg.Supervisor.ExecTTL != 0 { -// // var cancel2 context.CancelFunc -// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) -// // defer cancel2() -// //} else { -// // execCtx = ctx -// //} -// -// rsp, err := sw.Exec(rqs) -// if err != nil { -// errJ := p.checkMaxJobs(ctx, w) -// if errJ != nil { -// // todo: worker was not destroyed -// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) -// } -// -// // soft job errors are allowed -// if _, jobError := err.(JobError); jobError { -// p.ww.PushWorker(w) -// return EmptyPayload, err -// } -// -// sw.State().Set(StateInvalid) -// errS := w.Stop(ctx) -// if errS != nil { -// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) -// } -// -// return EmptyPayload, err -// } -// -// // worker want's to be terminated -// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { -// w.State().Set(StateInvalid) -// err = w.Stop(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// return p.ExecWithContext(ctx, rqs) -// } -// -// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { -// err = p.ww.AllocateNew(ctx) -// if err != nil { -// return EmptyPayload, err -// } -// } else { -// p.muw.Lock() -// p.ww.PushWorker(w) -// p.muw.Unlock() -// } -// -// return rsp, nil -// } +func (sp *StaticPool) execDebug(p Payload) (Payload, error) { + sw, err := sp.ww.allocator() + if err != nil { + return EmptyPayload, err + } + + r, err := sw.(SyncWorker).Exec(p) + + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("Exec") + w, err := sp.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + return EmptyPayload, errors.E(op, err) + } else if err != nil { + return EmptyPayload, err + } + + sw := w.(SyncWorker) + + rsp, err := sw.ExecWithContext(ctx, rqs) + if err != nil { + // soft job errors are allowed + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) + if err != nil { + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + sp.ww.PushWorker(w) + } + + return EmptyPayload, err + } + + sw.State().Set(StateInvalid) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + errS := w.Stop(bCtx) + + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + + return sp.Exec(rqs) + } + + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, err + } + } else { + sp.ww.PushWorker(w) + } + return rsp, nil +} // Destroy all underlying stack (but let them to complete the task). -func (p *StaticPool) Destroy(ctx context.Context) { - p.ww.Destroy(ctx) +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) } // allocate required number of stack -func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { - ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout) - w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() return nil, err @@ -261,11 +274,11 @@ func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]W return workers, nil } -func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(ctx) +func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(ctx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } |