diff options
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 378 |
1 files changed, 101 insertions, 277 deletions
diff --git a/static_pool.go b/static_pool.go index c7cc6517..1444e95a 100644 --- a/static_pool.go +++ b/static_pool.go @@ -1,11 +1,10 @@ package roadrunner import ( + "context" "fmt" "os/exec" "sync" - "sync/atomic" - "time" "github.com/pkg/errors" ) @@ -15,47 +14,32 @@ const ( StopRequest = "{\"stop\":true}" ) -// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers. +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { // pool behaviour - cfg Config + cfg *Config // worker command creator cmd func() *exec.Cmd - // creates and connects to workers + // creates and connects to stack factory Factory - // active task executions - tmu sync.Mutex - tasks sync.WaitGroup - - // workers circular allocation buf - free chan *Worker - - // number of workers expected to be dead in a buf. - numDead int64 - // protects state of worker list, does not affect allocation muw sync.RWMutex - // all registered workers - workers []*Worker + ww *WorkersWatcher - // invalid declares set of workers to be removed from the pool. - remove sync.Map - - // pool is being destroyed - inDestroy int32 - destroy chan interface{} - - // lsn is optional callback to handle worker create/destruct/error events. - mul sync.Mutex - lsn func(event int, ctx interface{}) + events chan PoolEvent +} +type PoolEvent struct { + Payload interface{} } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { +// supervisor Supervisor, todo: think about it +// stack func() (WorkerBase, error), +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { if err := cfg.Valid(); err != nil { return nil, errors.Wrap(err, "config") } @@ -64,305 +48,145 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er cfg: cfg, cmd: cmd, factory: factory, - workers: make([]*Worker, 0, cfg.NumWorkers), - free: make(chan *Worker, cfg.NumWorkers), - destroy: make(chan interface{}), + events: make(chan PoolEvent), } - // constant number of workers simplify logic - for i := int64(0); i < p.cfg.NumWorkers; i++ { - // to test if worker ready - w, err := p.createWorker() + p.ww = NewWorkerWatcher(func(args ...interface{}) (*SyncWorker, error) { + w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { - p.Destroy() return nil, err } - p.free <- w - } - - return p, nil -} - -// Listen attaches pool event controller. -func (p *StaticPool) Listen(l func(event int, ctx interface{})) { - p.mul.Lock() - defer p.mul.Unlock() + sw, err := NewSyncWorker(w) + if err != nil { + return nil, err + } + return &sw, nil + }, p.cfg.NumWorkers, p.events) - p.lsn = l + workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + if err != nil { + return nil, err + } - p.muw.Lock() - for _, w := range p.workers { - w.err.Listen(p.lsn) + // put stack in the pool + err = p.ww.AddToWatch(ctx, workers) + if err != nil { + return nil, err } - p.muw.Unlock() + + return p, nil } // Config returns associated pool configuration. Immutable. func (p *StaticPool) Config() Config { - return p.cfg + return *p.cfg } // Workers returns worker list associated with the pool. -func (p *StaticPool) Workers() (workers []*Worker) { +func (p *StaticPool) Workers(ctx context.Context) (workers []WorkerBase) { p.muw.RLock() defer p.muw.RUnlock() - - workers = append(workers, p.workers...) - - return workers + return p.ww.WorkersList(ctx) } -// Remove forces pool to remove specific worker. -func (p *StaticPool) Remove(w *Worker, err error) bool { - if w.State().Value() != StateReady && w.State().Value() != StateWorking { - // unable to remove inactive worker - return false - } - - if _, ok := p.remove.Load(w); ok { - return false - } - - p.remove.Store(w, err) - return true +func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { + return p.ww.RemoveWorker(ctx, wb) } // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { - p.tmu.Lock() - p.tasks.Add(1) - p.tmu.Unlock() - - defer p.tasks.Done() - - w, err := p.allocateWorker() - if err != nil { - return nil, errors.Wrap(err, "unable to allocate worker") +func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { + getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) + defer cancel() + w, err := p.ww.GetFreeWorker(getWorkerCtx) + if err != nil && errors.Is(err, ErrWatcherStopped) { + return EmptyPayload, ErrWatcherStopped + } else if err != nil { + return EmptyPayload, err } - rsp, err = w.Exec(rqs) + sw := w.(SyncWorker) + + execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL) + defer cancel2() + rsp, err := sw.Exec(execCtx, rqs) if err != nil { + errJ := p.checkMaxJobs(ctx, w) + if errJ != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) + } // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - p.release(w) - return nil, err + if _, jobError := err.(TaskError); jobError { + p.ww.PushWorker(w) + return EmptyPayload, err } - p.discardWorker(w, err) - return nil, err + sw.State().Set(StateInvalid) + errS := w.Stop(ctx) + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - p.discardWorker(w, err) - return p.Exec(rqs) - } - - p.release(w) - return rsp, nil -} - -// Destroy all underlying workers (but let them to complete the task). -func (p *StaticPool) Destroy() { - atomic.AddInt32(&p.inDestroy, 1) - - p.tmu.Lock() - p.tasks.Wait() - close(p.destroy) - p.tmu.Unlock() - - var wg sync.WaitGroup - for _, w := range p.Workers() { - wg.Add(1) - w.markInvalid() - go func(w *Worker) { - defer wg.Done() - p.destroyWorker(w, nil) - }(w) - } - - wg.Wait() -} - -// finds free worker in a given time interval. Skips dead workers. -func (p *StaticPool) allocateWorker() (w *Worker, err error) { - for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ { - // this loop is required to skip issues with dead workers still being in a ring - // (we know how many workers). - select { - case w = <-p.free: - if w.State().Value() != StateReady { - // found expected dead worker - atomic.AddInt64(&p.numDead, ^int64(0)) - continue - } - - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) - - // get next worker - i++ - continue - } - - return w, nil - case <-p.destroy: - return nil, fmt.Errorf("pool has been stopped") - default: - // enable timeout handler - } - - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w = <-p.free: - timeout.Stop() - - if w.State().Value() != StateReady { - atomic.AddInt64(&p.numDead, ^int64(0)) - continue - } - - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) - - // get next worker - i++ - continue - } - - return w, nil - case <-p.destroy: - timeout.Stop() - - return nil, fmt.Errorf("pool has been stopped") + w.State().Set(StateInvalid) + err = w.Stop(ctx) + if err != nil { + panic(err) } + return p.Exec(ctx, rqs) } - return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers) -} - -// release releases or replaces the worker. -func (p *StaticPool) release(w *Worker) { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - p.discardWorker(w, p.cfg.MaxJobs) - return - } - - if err, remove := p.remove.Load(w); remove { - p.discardWorker(w, err) - return + err = p.ww.AllocateNew(ctx) + if err != nil { + return EmptyPayload, err + } + } else { + p.muw.Lock() + p.ww.PushWorker(w) + p.muw.Unlock() } - - p.free <- w + return rsp, nil } -// creates new worker using associated factory. automatically -// adds worker to the worker list (background) -func (p *StaticPool) createWorker() (*Worker, error) { - w, err := p.factory.SpawnWorker(p.cmd()) - if err != nil { - return nil, err - } - - p.mul.Lock() - if p.lsn != nil { - w.err.Listen(p.lsn) - } - p.mul.Unlock() - - p.throw(EventWorkerConstruct, w) - - p.muw.Lock() - p.workers = append(p.workers, w) - p.muw.Unlock() - - go p.watchWorker(w) - return w, nil +// Destroy all underlying stack (but let them to complete the task). +func (p *StaticPool) Destroy(ctx context.Context) { + p.ww.Destroy(ctx) } -// gentry remove worker -func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { - w.markInvalid() - go p.destroyWorker(w, caused) +func (p *StaticPool) Events() chan PoolEvent { + return p.events } -// destroyWorker destroys workers and removes it from the pool. -func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { - go func() { - err := w.Stop() - if err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) - } - }() - - select { - case <-w.waitDone: - // worker is dead - p.throw(EventWorkerDestruct, w) +// allocate required number of stack +func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { + var workers []WorkerBase - case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process in given time - if err := w.Kill(); err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + // constant number of stack simplify logic + for i := int64(0); i < numWorkers; i++ { + ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout) + w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) + if err != nil { + cancel() + return nil, err } - - p.throw(EventWorkerKill, w) + cancel() + workers = append(workers, w) } + return workers, nil } -// watchWorker watches worker state and replaces it if worker fails. -func (p *StaticPool) watchWorker(w *Worker) { - err := w.Wait() - p.throw(EventWorkerDead, w) - - // detaching - p.muw.Lock() - for i, wc := range p.workers { - if wc == w { - p.workers = append(p.workers[:i], p.workers[i+1:]...) - p.remove.Delete(w) - break - } - } - p.muw.Unlock() - - // registering a dead worker - atomic.AddInt64(&p.numDead, 1) - - // worker have died unexpectedly, pool should attempt to replace it with alive version safely - if err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) - } - - if !p.destroyed() { - nw, err := p.createWorker() - if err == nil { - p.free <- nw - return - } - - // possible situation when major error causes all PHP scripts to die (for example dead DB) - if len(p.Workers()) == 0 { - p.throw(EventPoolError, err) - } else { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) +func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(ctx) + if err != nil { + return err } } -} - -func (p *StaticPool) destroyed() bool { - return atomic.LoadInt32(&p.inDestroy) != 0 -} - -// throw invokes event handler if any. -func (p *StaticPool) throw(event int, ctx interface{}) { - p.mul.Lock() - if p.lsn != nil { - p.lsn(event, ctx) - } - p.mul.Unlock() + return nil } |