summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x[-rw-r--r--]static_pool.go504
1 files changed, 241 insertions, 263 deletions
diff --git a/static_pool.go b/static_pool.go
index c4b6f42d..b626a499 100644..100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -1,370 +1,348 @@
package roadrunner
import (
- "fmt"
+ "context"
"os/exec"
- "sync"
- "sync/atomic"
- "time"
- "github.com/pkg/errors"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/util"
)
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
-// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
-type StaticPool struct {
- // pool behaviour
- cfg Config
+var bCtx = context.Background()
- // worker command creator
- cmd func() *exec.Cmd
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (WorkerBase, error)
- // creates and connects to workers
- factory Factory
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w WorkerBase) (Payload, error)
- // active task executions
- tmu sync.Mutex
- tasks sync.WaitGroup
+// PoolBefore is set of functions that executes BEFORE Exec
+type Before func(req Payload) Payload
- // workers circular allocation buf
- free chan *Worker
+// PoolAfter is set of functions that executes AFTER Exec
+type After func(req Payload, resp Payload) Payload
- // number of workers expected to be dead in a buf.
- numDead int64
+type PoolOptions func(p *StaticPool)
- // protects state of worker list, does not affect allocation
- muw sync.RWMutex
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
+type StaticPool struct {
+ cfg PoolConfig
- // all registered workers
- workers []*Worker
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to stack
+ factory Factory
- // invalid declares set of workers to be removed from the pool.
- remove sync.Map
+ // distributes the events
+ events util.EventsHandler
- // pool is being destroyed
- inDestroy int32
- destroy chan interface{}
+ // manages worker states and TTLs
+ ww WorkerWatcher
- // lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- lsn func(event int, ctx interface{})
+ // allocate new worker
+ allocator Allocator
+
+ errEncoder ErrorEncoder
+ before []Before
+ after []After
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config")
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) {
+ const op = errors.Op("NewPool")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
+ cfg.InitDefaults()
+
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
}
p := &StaticPool{
cfg: cfg,
cmd: cmd,
factory: factory,
- workers: make([]*Worker, 0, cfg.NumWorkers),
- free: make(chan *Worker, cfg.NumWorkers),
- destroy: make(chan interface{}),
+ events: util.NewEventsHandler(),
+ after: make([]After, 0, 0),
+ before: make([]Before, 0, 0),
}
- // constant number of workers simplify logic
- for i := int64(0); i < p.cfg.NumWorkers; i++ {
- // to test if worker ready
- w, err := p.createWorker()
- if err != nil {
- p.Destroy()
- return nil, err
- }
+ p.allocator = newPoolAllocator(factory, cmd)
+ p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- p.free <- w
+ workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ if err != nil {
+ return nil, errors.E(op, err)
}
- return p, nil
-}
+ // put stack in the pool
+ err = p.ww.AddToWatch(workers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
-// Listen attaches pool event controller.
-func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
- p.mul.Lock()
- defer p.mul.Unlock()
+ p.errEncoder = defaultErrEncoder(p)
- p.lsn = l
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
- p.muw.Lock()
- for _, w := range p.workers {
- w.err.Listen(p.lsn)
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
}
- p.muw.Unlock()
-}
-// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) Config() Config {
- return p.cfg
+ return p, nil
}
-// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers() (workers []*Worker) {
- p.muw.RLock()
- defer p.muw.RUnlock()
-
- workers = append(workers, p.workers...)
-
- return workers
+func PoolBefore(before ...Before) PoolOptions {
+ return func(p *StaticPool) {
+ p.before = append(p.before, before...)
+ }
}
-// Remove forces pool to remove specific worker.
-func (p *StaticPool) Remove(w *Worker, err error) bool {
- if w.State().Value() != StateReady && w.State().Value() != StateWorking {
- // unable to remove inactive worker
- return false
+func PoolAfter(after ...After) PoolOptions {
+ return func(p *StaticPool) {
+ p.after = append(p.after, after...)
}
+}
- if _, ok := p.remove.Load(w); ok {
- return false
- }
+// AddListener connects event listener to the pool.
+func (sp *StaticPool) AddListener(listener util.EventListener) {
+ sp.events.AddListener(listener)
+}
- p.remove.Store(w, err)
- return true
+// PoolConfig returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() PoolConfig {
+ return sp.cfg
}
-// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
- p.tmu.Lock()
- p.tasks.Add(1)
- p.tmu.Unlock()
+// Workers returns worker list associated with the pool.
+func (sp *StaticPool) Workers() (workers []WorkerBase) {
+ return sp.ww.WorkersList()
+}
- defer p.tasks.Done()
+func (sp *StaticPool) RemoveWorker(wb WorkerBase) error {
+ return sp.ww.RemoveWorker(wb)
+}
- w, err := p.allocateWorker()
+func (sp *StaticPool) Exec(p Payload) (Payload, error) {
+ const op = errors.Op("exec")
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return nil, errors.Wrap(err, "unable to allocate worker")
+ return EmptyPayload, errors.E(op, err)
}
- rsp, err = w.Exec(rqs)
+ sw := w.(SyncWorker)
- if err != nil {
- // soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- p.release(w)
- return nil, err
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ p = sp.before[i](p)
}
+ }
- p.discardWorker(w, err)
- return nil, err
+ rsp, err := sw.Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, sw)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- p.discardWorker(w, err)
- return p.Exec(rqs)
+ sw.State().Set(StateInvalid)
+ err = sw.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ }
+
+ return sp.Exec(p)
}
- p.release(w)
- return rsp, nil
-}
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(sw)
+ }
-// Destroy all underlying workers (but let them to complete the task).
-func (p *StaticPool) Destroy() {
- atomic.AddInt32(&p.inDestroy, 1)
-
- p.tmu.Lock()
- p.tasks.Wait()
- close(p.destroy)
- p.tmu.Unlock()
-
- var wg sync.WaitGroup
- for _, w := range p.Workers() {
- wg.Add(1)
- w.markInvalid()
- go func(w *Worker) {
- defer wg.Done()
- p.destroyWorker(w, nil)
- }(w)
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](p, rsp)
+ }
}
- wg.Wait()
+ return rsp, nil
}
-// finds free worker in a given time interval. Skips dead workers.
-func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- // TODO loop counts upward, but its variable is bounded downward.
- for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ {
- // this loop is required to skip issues with dead workers still being in a ring
- // (we know how many workers).
- select {
- case w = <-p.free:
- if w.State().Value() != StateReady {
- // found expected dead worker
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("exec with context")
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
- // get next worker
- i++
- continue
- }
+ sw := w.(SyncWorker)
- return w, nil
- case <-p.destroy:
- return nil, fmt.Errorf("pool has been stopped")
- default:
- // enable timeout handler
+ // apply all before function
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ rqs = sp.before[i](rqs)
}
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w = <-p.free:
- timeout.Stop()
-
- if w.State().Value() != StateReady {
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
-
- // get next worker
- i++
- continue
- }
-
- return w, nil
- case <-p.destroy:
- timeout.Stop()
+ rsp, err := sw.ExecWithContext(ctx, rqs)
+ if err != nil {
+ return sp.errEncoder(err, sw)
+ }
- return nil, fmt.Errorf("pool has been stopped")
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ sw.State().Set(StateInvalid)
+ err = sw.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
- }
- return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers)
-}
+ return sp.Exec(rqs)
+ }
-// release releases or replaces the worker.
-func (p *StaticPool) release(w *Worker) {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- p.discardWorker(w, p.cfg.MaxJobs)
- return
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(sw)
}
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
- return
+ // apply all after functions
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](rqs, rsp)
+ }
}
- p.free <- w
+ return rsp, nil
}
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *StaticPool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
- return nil, err
- }
-
- p.mul.Lock()
- if p.lsn != nil {
- w.err.Listen(p.lsn)
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
}
- p.mul.Unlock()
-
- p.throw(EventWorkerConstruct, w)
-
- p.muw.Lock()
- p.workers = append(p.workers, w)
- p.muw.Unlock()
-
- go p.watchWorker(w)
return w, nil
}
-// gentry remove worker
-func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
- w.markInvalid()
- go p.destroyWorker(w, caused)
+// Destroy all underlying stack (but let them to complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
}
-// destroyWorker destroys workers and removes it from the pool.
-// TODO caused unused
-func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
- go func() {
- err := w.Stop()
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w WorkerBase) (Payload, error) {
+ const op = errors.Op("error encoder")
+ // soft job errors are allowed
+ if errors.Is(errors.ErrSoftJob, err) {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+
+ return EmptyPayload, errors.E(op, err)
}
- }()
- select {
- case <-w.waitDone:
- // worker is dead
- p.throw(EventWorkerDestruct, w)
+ w.State().Set(StateInvalid)
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ errS := w.Stop(bCtx)
- case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process in given time
- if err := w.Kill(); err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ if errS != nil {
+ return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- p.throw(EventWorkerKill, w)
+ return EmptyPayload, errors.E(op, err)
}
}
-// watchWorker watches worker state and replaces it if worker fails.
-func (p *StaticPool) watchWorker(w *Worker) {
- err := w.Wait()
- p.throw(EventWorkerDead, w)
-
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = append(p.workers[:i], p.workers[i+1:]...)
- p.remove.Delete(w)
- break
+func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator {
+ return func() (WorkerBase, error) {
+ w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ if err != nil {
+ return nil, err
+ }
+
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ return nil, err
}
+ return sw, nil
}
- p.muw.Unlock()
-
- // registering a dead worker
- atomic.AddInt64(&p.numDead, 1)
+}
- // worker have died unexpectedly, pool should attempt to replace it with alive version safely
+func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
+ sw, err := sp.allocator()
if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ return EmptyPayload, err
}
- if !p.destroyed() {
- nw, err := p.createWorker()
- if err == nil {
- p.free <- nw
- return
- }
+ r, err := sw.(SyncWorker).Exec(p)
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- if len(p.Workers()) == 0 {
- p.throw(EventPoolError, err)
- } else {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
+ if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
}
-}
-func (p *StaticPool) destroyed() bool {
- return atomic.LoadInt32(&p.inDestroy) != 0
+ return r, err
}
-// throw invokes event handler if any.
-func (p *StaticPool) throw(event int, ctx interface{}) {
- p.mul.Lock()
- if p.lsn != nil {
- p.lsn(event, ctx)
+// allocate required number of stack
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+ const op = errors.Op("allocate workers")
+ var workers []WorkerBase
+
+ // constant number of stack simplify logic
+ for i := int64(0); i < numWorkers; i++ {
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, errors.WorkerAllocate, err)
+ }
+ workers = append(workers, w)
+ cancel()
}
- p.mul.Unlock()
+ return workers, nil
}