diff options
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x[-rw-r--r--] | static_pool.go | 504 |
1 files changed, 241 insertions, 263 deletions
diff --git a/static_pool.go b/static_pool.go index c4b6f42d..b626a499 100644..100755 --- a/static_pool.go +++ b/static_pool.go @@ -1,370 +1,348 @@ package roadrunner import ( - "fmt" + "context" "os/exec" - "sync" - "sync/atomic" - "time" - "github.com/pkg/errors" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/util" ) -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" -// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers. -type StaticPool struct { - // pool behaviour - cfg Config +var bCtx = context.Background() - // worker command creator - cmd func() *exec.Cmd +// Allocator is responsible for worker allocation in the pool +type Allocator func() (WorkerBase, error) - // creates and connects to workers - factory Factory +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w WorkerBase) (Payload, error) - // active task executions - tmu sync.Mutex - tasks sync.WaitGroup +// PoolBefore is set of functions that executes BEFORE Exec +type Before func(req Payload) Payload - // workers circular allocation buf - free chan *Worker +// PoolAfter is set of functions that executes AFTER Exec +type After func(req Payload, resp Payload) Payload - // number of workers expected to be dead in a buf. - numDead int64 +type PoolOptions func(p *StaticPool) - // protects state of worker list, does not affect allocation - muw sync.RWMutex +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +type StaticPool struct { + cfg PoolConfig - // all registered workers - workers []*Worker + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to stack + factory Factory - // invalid declares set of workers to be removed from the pool. - remove sync.Map + // distributes the events + events util.EventsHandler - // pool is being destroyed - inDestroy int32 - destroy chan interface{} + // manages worker states and TTLs + ww WorkerWatcher - // lsn is optional callback to handle worker create/destruct/error events. - mul sync.Mutex - lsn func(event int, ctx interface{}) + // allocate new worker + allocator Allocator + + errEncoder ErrorEncoder + before []Before + after []After } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { - if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config") +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { + const op = errors.Op("NewPool") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } + cfg.InitDefaults() + + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 } p := &StaticPool{ cfg: cfg, cmd: cmd, factory: factory, - workers: make([]*Worker, 0, cfg.NumWorkers), - free: make(chan *Worker, cfg.NumWorkers), - destroy: make(chan interface{}), + events: util.NewEventsHandler(), + after: make([]After, 0, 0), + before: make([]Before, 0, 0), } - // constant number of workers simplify logic - for i := int64(0); i < p.cfg.NumWorkers; i++ { - // to test if worker ready - w, err := p.createWorker() - if err != nil { - p.Destroy() - return nil, err - } + p.allocator = newPoolAllocator(factory, cmd) + p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) - p.free <- w + workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + if err != nil { + return nil, errors.E(op, err) } - return p, nil -} + // put stack in the pool + err = p.ww.AddToWatch(workers) + if err != nil { + return nil, errors.E(op, err) + } -// Listen attaches pool event controller. -func (p *StaticPool) Listen(l func(event int, ctx interface{})) { - p.mul.Lock() - defer p.mul.Unlock() + p.errEncoder = defaultErrEncoder(p) - p.lsn = l + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } - p.muw.Lock() - for _, w := range p.workers { - w.err.Listen(p.lsn) + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil } - p.muw.Unlock() -} -// Config returns associated pool configuration. Immutable. -func (p *StaticPool) Config() Config { - return p.cfg + return p, nil } -// Workers returns worker list associated with the pool. -func (p *StaticPool) Workers() (workers []*Worker) { - p.muw.RLock() - defer p.muw.RUnlock() - - workers = append(workers, p.workers...) - - return workers +func PoolBefore(before ...Before) PoolOptions { + return func(p *StaticPool) { + p.before = append(p.before, before...) + } } -// Remove forces pool to remove specific worker. -func (p *StaticPool) Remove(w *Worker, err error) bool { - if w.State().Value() != StateReady && w.State().Value() != StateWorking { - // unable to remove inactive worker - return false +func PoolAfter(after ...After) PoolOptions { + return func(p *StaticPool) { + p.after = append(p.after, after...) } +} - if _, ok := p.remove.Load(w); ok { - return false - } +// AddListener connects event listener to the pool. +func (sp *StaticPool) AddListener(listener util.EventListener) { + sp.events.AddListener(listener) +} - p.remove.Store(w, err) - return true +// PoolConfig returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() PoolConfig { + return sp.cfg } -// Exec one task with given payload and context, returns result or error. -func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { - p.tmu.Lock() - p.tasks.Add(1) - p.tmu.Unlock() +// Workers returns worker list associated with the pool. +func (sp *StaticPool) Workers() (workers []WorkerBase) { + return sp.ww.WorkersList() +} - defer p.tasks.Done() +func (sp *StaticPool) RemoveWorker(wb WorkerBase) error { + return sp.ww.RemoveWorker(wb) +} - w, err := p.allocateWorker() +func (sp *StaticPool) Exec(p Payload) (Payload, error) { + const op = errors.Op("exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return nil, errors.Wrap(err, "unable to allocate worker") + return EmptyPayload, errors.E(op, err) } - rsp, err = w.Exec(rqs) + sw := w.(SyncWorker) - if err != nil { - // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - p.release(w) - return nil, err + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + p = sp.before[i](p) } + } - p.discardWorker(w, err) - return nil, err + rsp, err := sw.Exec(p) + if err != nil { + return sp.errEncoder(err, sw) } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - p.discardWorker(w, err) - return p.Exec(rqs) + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + } + + return sp.Exec(p) } - p.release(w) - return rsp, nil -} + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + return EmptyPayload, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) + } -// Destroy all underlying workers (but let them to complete the task). -func (p *StaticPool) Destroy() { - atomic.AddInt32(&p.inDestroy, 1) - - p.tmu.Lock() - p.tasks.Wait() - close(p.destroy) - p.tmu.Unlock() - - var wg sync.WaitGroup - for _, w := range p.Workers() { - wg.Add(1) - w.markInvalid() - go func(w *Worker) { - defer wg.Done() - p.destroyWorker(w, nil) - }(w) + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](p, rsp) + } } - wg.Wait() + return rsp, nil } -// finds free worker in a given time interval. Skips dead workers. -func (p *StaticPool) allocateWorker() (w *Worker, err error) { - // TODO loop counts upward, but its variable is bounded downward. - for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ { - // this loop is required to skip issues with dead workers still being in a ring - // (we know how many workers). - select { - case w = <-p.free: - if w.State().Value() != StateReady { - // found expected dead worker - atomic.AddInt64(&p.numDead, ^int64(0)) - continue - } - - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + const op = errors.Op("exec with context") + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return EmptyPayload, errors.E(op, err) + } - // get next worker - i++ - continue - } + sw := w.(SyncWorker) - return w, nil - case <-p.destroy: - return nil, fmt.Errorf("pool has been stopped") - default: - // enable timeout handler + // apply all before function + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + rqs = sp.before[i](rqs) } + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w = <-p.free: - timeout.Stop() - - if w.State().Value() != StateReady { - atomic.AddInt64(&p.numDead, ^int64(0)) - continue - } - - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) - - // get next worker - i++ - continue - } - - return w, nil - case <-p.destroy: - timeout.Stop() + rsp, err := sw.ExecWithContext(ctx, rqs) + if err != nil { + return sp.errEncoder(err, sw) + } - return nil, fmt.Errorf("pool has been stopped") + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } - } - return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers) -} + return sp.Exec(rqs) + } -// release releases or replaces the worker. -func (p *StaticPool) release(w *Worker) { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - p.discardWorker(w, p.cfg.MaxJobs) - return + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + return EmptyPayload, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) } - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) - return + // apply all after functions + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](rqs, rsp) + } } - p.free <- w + return rsp, nil } -// creates new worker using associated factory. automatically -// adds worker to the worker list (background) -func (p *StaticPool) createWorker() (*Worker, error) { - w, err := p.factory.SpawnWorker(p.cmd()) +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { - return nil, err - } - - p.mul.Lock() - if p.lsn != nil { - w.err.Listen(p.lsn) + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) } - p.mul.Unlock() - - p.throw(EventWorkerConstruct, w) - - p.muw.Lock() - p.workers = append(p.workers, w) - p.muw.Unlock() - - go p.watchWorker(w) return w, nil } -// gentry remove worker -func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { - w.markInvalid() - go p.destroyWorker(w, caused) +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) } -// destroyWorker destroys workers and removes it from the pool. -// TODO caused unused -func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { - go func() { - err := w.Stop() - if err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w WorkerBase) (Payload, error) { + const op = errors.Op("error encoder") + // soft job errors are allowed + if errors.Is(errors.ErrSoftJob, err) { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } + } else { + sp.ww.PushWorker(w) + } + + return EmptyPayload, errors.E(op, err) } - }() - select { - case <-w.waitDone: - // worker is dead - p.throw(EventWorkerDestruct, w) + w.State().Set(StateInvalid) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + errS := w.Stop(bCtx) - case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process in given time - if err := w.Kill(); err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + if errS != nil { + return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - p.throw(EventWorkerKill, w) + return EmptyPayload, errors.E(op, err) } } -// watchWorker watches worker state and replaces it if worker fails. -func (p *StaticPool) watchWorker(w *Worker) { - err := w.Wait() - p.throw(EventWorkerDead, w) - - // detaching - p.muw.Lock() - for i, wc := range p.workers { - if wc == w { - p.workers = append(p.workers[:i], p.workers[i+1:]...) - p.remove.Delete(w) - break +func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator { + return func() (WorkerBase, error) { + w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) + if err != nil { + return nil, err + } + + sw, err := NewSyncWorker(w) + if err != nil { + return nil, err } + return sw, nil } - p.muw.Unlock() - - // registering a dead worker - atomic.AddInt64(&p.numDead, 1) +} - // worker have died unexpectedly, pool should attempt to replace it with alive version safely +func (sp *StaticPool) execDebug(p Payload) (Payload, error) { + sw, err := sp.allocator() if err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + return EmptyPayload, err } - if !p.destroyed() { - nw, err := p.createWorker() - if err == nil { - p.free <- nw - return - } + r, err := sw.(SyncWorker).Exec(p) - // possible situation when major error causes all PHP scripts to die (for example dead DB) - if len(p.Workers()) == 0 { - p.throw(EventPoolError, err) - } else { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) - } + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) } -} -func (p *StaticPool) destroyed() bool { - return atomic.LoadInt32(&p.inDestroy) != 0 + return r, err } -// throw invokes event handler if any. -func (p *StaticPool) throw(event int, ctx interface{}) { - p.mul.Lock() - if p.lsn != nil { - p.lsn(event, ctx) +// allocate required number of stack +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { + const op = errors.Op("allocate workers") + var workers []WorkerBase + + // constant number of stack simplify logic + for i := int64(0); i < numWorkers; i++ { + ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) + if err != nil { + cancel() + return nil, errors.E(op, errors.WorkerAllocate, err) + } + workers = append(workers, w) + cancel() } - p.mul.Unlock() + return workers, nil } |