summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go374
1 files changed, 0 insertions, 374 deletions
diff --git a/static_pool.go b/static_pool.go
deleted file mode 100644
index efd9125a..00000000
--- a/static_pool.go
+++ /dev/null
@@ -1,374 +0,0 @@
-package roadrunner
-
-import (
- "os/exec"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/pkg/errors"
-)
-
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
-
-// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
-type StaticPool struct {
- // pool behaviour
- cfg Config
-
- // worker command creator
- cmd func() *exec.Cmd
-
- // creates and connects to workers
- factory Factory
-
- // active task executions
- tmu sync.Mutex
- tasks sync.WaitGroup
-
- // workers circular allocation buf
- free chan *Worker
-
- // number of workers expected to be dead in a buf.
- numDead int64
-
- // protects state of worker list, does not affect allocation
- muw sync.RWMutex
-
- // all registered workers
- workers []*Worker
-
- // invalid declares set of workers to be removed from the pool.
- remove sync.Map
-
- // pool is being destroyed
- inDestroy int32
- destroy chan interface{}
-
- // lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- lsn func(event int, ctx interface{})
-}
-
-// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config")
- }
-
- p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- workers: make([]*Worker, 0, cfg.NumWorkers),
- free: make(chan *Worker, cfg.NumWorkers),
- destroy: make(chan interface{}),
- }
-
- // constant number of workers simplify logic
- for i := int64(0); i < p.cfg.NumWorkers; i++ {
- // to test if worker ready
- w, err := p.createWorker()
- if err != nil {
- p.Destroy()
- return nil, err
- }
-
- p.free <- w
- }
-
- return p, nil
-}
-
-// Listen attaches pool event controller.
-func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
- p.mul.Lock()
- defer p.mul.Unlock()
-
- p.lsn = l
-
- p.muw.Lock()
- for _, w := range p.workers {
- w.err.Listen(p.lsn)
- }
- p.muw.Unlock()
-}
-
-// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) Config() Config {
- return p.cfg
-}
-
-// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers() (workers []*Worker) {
- p.muw.RLock()
- defer p.muw.RUnlock()
-
- workers = append(workers, p.workers...)
-
- return workers
-}
-
-// Remove forces pool to remove specific worker.
-func (p *StaticPool) Remove(w *Worker, err error) bool {
- if w.State().Value() != StateReady && w.State().Value() != StateWorking {
- // unable to remove inactive worker
- return false
- }
-
- if _, ok := p.remove.Load(w); ok {
- return false
- }
-
- p.remove.Store(w, err)
- return true
-}
-
-var ErrAllocateWorker = errors.New("unable to allocate worker")
-
-// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
- p.tmu.Lock()
- p.tasks.Add(1)
- p.tmu.Unlock()
-
- defer p.tasks.Done()
-
- w, err := p.allocateWorker()
- if err != nil {
- return nil, ErrAllocateWorker
- }
-
- rsp, err = w.Exec(rqs)
- if err != nil {
- // soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- p.release(w)
- return nil, err
- }
-
- p.discardWorker(w, err)
- return nil, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- p.discardWorker(w, err)
- return p.Exec(rqs)
- }
-
- p.release(w)
- return rsp, nil
-}
-
-// Destroy all underlying workers (but let them to complete the task).
-func (p *StaticPool) Destroy() {
- atomic.AddInt32(&p.inDestroy, 1)
-
- p.tmu.Lock()
- p.tasks.Wait()
- close(p.destroy)
- p.tmu.Unlock()
-
- var wg sync.WaitGroup
- for _, w := range p.Workers() {
- wg.Add(1)
- w.markInvalid()
- go func(w *Worker) {
- defer wg.Done()
- p.destroyWorker(w, nil)
- }(w)
- }
-
- wg.Wait()
-}
-
-var ErrPoolStopped = errors.New("pool has been stopped")
-var ErrWorkerAllocateTimeout = errors.New("worker allocate timeout")
-var ErrAllWorkersAreDead = errors.New("all workers are dead")
-
-// finds free worker in a given time interval. Skips dead workers.
-func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- // TODO loop counts upward, but its variable is bounded downward.
- for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ {
- // this loop is required to skip issues with dead workers still being in a ring
- // (we know how many workers).
- select {
- case w = <-p.free:
- if w.State().Value() != StateReady {
- // found expected dead worker
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
-
- // get next worker
- i++
- continue
- }
-
- return w, nil
- case <-p.destroy:
- return nil, ErrPoolStopped
- default:
- // enable timeout handler
- }
-
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, ErrWorkerAllocateTimeout
- case w = <-p.free:
- timeout.Stop()
-
- if w.State().Value() != StateReady {
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
-
- // get next worker
- i++
- continue
- }
-
- return w, nil
- case <-p.destroy:
- timeout.Stop()
-
- return nil, ErrPoolStopped
- }
- }
-
- return nil, ErrAllWorkersAreDead
-}
-
-// release releases or replaces the worker.
-func (p *StaticPool) release(w *Worker) {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- p.discardWorker(w, p.cfg.MaxJobs)
- return
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
- return
- }
-
- p.free <- w
-}
-
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *StaticPool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
- if err != nil {
- return nil, err
- }
-
- p.mul.Lock()
- if p.lsn != nil {
- w.err.Listen(p.lsn)
- }
- p.mul.Unlock()
-
- p.throw(EventWorkerConstruct, w)
-
- p.muw.Lock()
- p.workers = append(p.workers, w)
- p.muw.Unlock()
-
- go p.watchWorker(w)
- return w, nil
-}
-
-// gentry remove worker
-func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
- w.markInvalid()
- go p.destroyWorker(w, caused)
-}
-
-// destroyWorker destroys workers and removes it from the pool.
-// TODO caused unused
-func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
- go func() {
- err := w.Stop()
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
- }()
-
- select {
- case <-w.waitDone:
- // worker is dead
- p.throw(EventWorkerDestruct, w)
-
- case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process in given time
- if err := w.Kill(); err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
-
- p.throw(EventWorkerKill, w)
- }
-}
-
-// watchWorker watches worker state and replaces it if worker fails.
-func (p *StaticPool) watchWorker(w *Worker) {
- err := w.Wait()
- p.throw(EventWorkerDead, w)
-
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = append(p.workers[:i], p.workers[i+1:]...)
- p.remove.Delete(w)
- break
- }
- }
- p.muw.Unlock()
-
- // registering a dead worker
- atomic.AddInt64(&p.numDead, 1)
-
- // worker have died unexpectedly, pool should attempt to replace it with alive version safely
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
-
- if !p.destroyed() {
- nw, err := p.createWorker()
- if err == nil {
- p.free <- nw
- return
- }
-
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- if len(p.Workers()) == 0 {
- p.throw(EventPoolError, err)
- } else {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
- }
-}
-
-func (p *StaticPool) destroyed() bool {
- return atomic.LoadInt32(&p.inDestroy) != 0
-}
-
-// throw invokes event handler if any.
-func (p *StaticPool) throw(event int, ctx interface{}) {
- p.mul.Lock()
- if p.lsn != nil {
- p.lsn(event, ctx)
- }
- p.mul.Unlock()
-}