summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go378
1 files changed, 101 insertions, 277 deletions
diff --git a/static_pool.go b/static_pool.go
index c7cc6517..1444e95a 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -1,11 +1,10 @@
package roadrunner
import (
+ "context"
"fmt"
"os/exec"
"sync"
- "sync/atomic"
- "time"
"github.com/pkg/errors"
)
@@ -15,47 +14,32 @@ const (
StopRequest = "{\"stop\":true}"
)
-// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
// pool behaviour
- cfg Config
+ cfg *Config
// worker command creator
cmd func() *exec.Cmd
- // creates and connects to workers
+ // creates and connects to stack
factory Factory
- // active task executions
- tmu sync.Mutex
- tasks sync.WaitGroup
-
- // workers circular allocation buf
- free chan *Worker
-
- // number of workers expected to be dead in a buf.
- numDead int64
-
// protects state of worker list, does not affect allocation
muw sync.RWMutex
- // all registered workers
- workers []*Worker
+ ww *WorkersWatcher
- // invalid declares set of workers to be removed from the pool.
- remove sync.Map
-
- // pool is being destroyed
- inDestroy int32
- destroy chan interface{}
-
- // lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- lsn func(event int, ctx interface{})
+ events chan PoolEvent
+}
+type PoolEvent struct {
+ Payload interface{}
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
+// supervisor Supervisor, todo: think about it
+// stack func() (WorkerBase, error),
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
if err := cfg.Valid(); err != nil {
return nil, errors.Wrap(err, "config")
}
@@ -64,305 +48,145 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
cfg: cfg,
cmd: cmd,
factory: factory,
- workers: make([]*Worker, 0, cfg.NumWorkers),
- free: make(chan *Worker, cfg.NumWorkers),
- destroy: make(chan interface{}),
+ events: make(chan PoolEvent),
}
- // constant number of workers simplify logic
- for i := int64(0); i < p.cfg.NumWorkers; i++ {
- // to test if worker ready
- w, err := p.createWorker()
+ p.ww = NewWorkerWatcher(func(args ...interface{}) (*SyncWorker, error) {
+ w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
if err != nil {
- p.Destroy()
return nil, err
}
- p.free <- w
- }
-
- return p, nil
-}
-
-// Listen attaches pool event controller.
-func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
- p.mul.Lock()
- defer p.mul.Unlock()
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ return nil, err
+ }
+ return &sw, nil
+ }, p.cfg.NumWorkers, p.events)
- p.lsn = l
+ workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ if err != nil {
+ return nil, err
+ }
- p.muw.Lock()
- for _, w := range p.workers {
- w.err.Listen(p.lsn)
+ // put stack in the pool
+ err = p.ww.AddToWatch(ctx, workers)
+ if err != nil {
+ return nil, err
}
- p.muw.Unlock()
+
+ return p, nil
}
// Config returns associated pool configuration. Immutable.
func (p *StaticPool) Config() Config {
- return p.cfg
+ return *p.cfg
}
// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers() (workers []*Worker) {
+func (p *StaticPool) Workers(ctx context.Context) (workers []WorkerBase) {
p.muw.RLock()
defer p.muw.RUnlock()
-
- workers = append(workers, p.workers...)
-
- return workers
+ return p.ww.WorkersList(ctx)
}
-// Remove forces pool to remove specific worker.
-func (p *StaticPool) Remove(w *Worker, err error) bool {
- if w.State().Value() != StateReady && w.State().Value() != StateWorking {
- // unable to remove inactive worker
- return false
- }
-
- if _, ok := p.remove.Load(w); ok {
- return false
- }
-
- p.remove.Store(w, err)
- return true
+func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+ return p.ww.RemoveWorker(ctx, wb)
}
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
- p.tmu.Lock()
- p.tasks.Add(1)
- p.tmu.Unlock()
-
- defer p.tasks.Done()
-
- w, err := p.allocateWorker()
- if err != nil {
- return nil, errors.Wrap(err, "unable to allocate worker")
+func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
+ getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := p.ww.GetFreeWorker(getWorkerCtx)
+ if err != nil && errors.Is(err, ErrWatcherStopped) {
+ return EmptyPayload, ErrWatcherStopped
+ } else if err != nil {
+ return EmptyPayload, err
}
- rsp, err = w.Exec(rqs)
+ sw := w.(SyncWorker)
+
+ execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
+ defer cancel2()
+ rsp, err := sw.Exec(execCtx, rqs)
if err != nil {
+ errJ := p.checkMaxJobs(ctx, w)
+ if errJ != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+ }
// soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- p.release(w)
- return nil, err
+ if _, jobError := err.(TaskError); jobError {
+ p.ww.PushWorker(w)
+ return EmptyPayload, err
}
- p.discardWorker(w, err)
- return nil, err
+ sw.State().Set(StateInvalid)
+ errS := w.Stop(ctx)
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- p.discardWorker(w, err)
- return p.Exec(rqs)
- }
-
- p.release(w)
- return rsp, nil
-}
-
-// Destroy all underlying workers (but let them to complete the task).
-func (p *StaticPool) Destroy() {
- atomic.AddInt32(&p.inDestroy, 1)
-
- p.tmu.Lock()
- p.tasks.Wait()
- close(p.destroy)
- p.tmu.Unlock()
-
- var wg sync.WaitGroup
- for _, w := range p.Workers() {
- wg.Add(1)
- w.markInvalid()
- go func(w *Worker) {
- defer wg.Done()
- p.destroyWorker(w, nil)
- }(w)
- }
-
- wg.Wait()
-}
-
-// finds free worker in a given time interval. Skips dead workers.
-func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ {
- // this loop is required to skip issues with dead workers still being in a ring
- // (we know how many workers).
- select {
- case w = <-p.free:
- if w.State().Value() != StateReady {
- // found expected dead worker
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
-
- // get next worker
- i++
- continue
- }
-
- return w, nil
- case <-p.destroy:
- return nil, fmt.Errorf("pool has been stopped")
- default:
- // enable timeout handler
- }
-
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w = <-p.free:
- timeout.Stop()
-
- if w.State().Value() != StateReady {
- atomic.AddInt64(&p.numDead, ^int64(0))
- continue
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
-
- // get next worker
- i++
- continue
- }
-
- return w, nil
- case <-p.destroy:
- timeout.Stop()
-
- return nil, fmt.Errorf("pool has been stopped")
+ w.State().Set(StateInvalid)
+ err = w.Stop(ctx)
+ if err != nil {
+ panic(err)
}
+ return p.Exec(ctx, rqs)
}
- return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers)
-}
-
-// release releases or replaces the worker.
-func (p *StaticPool) release(w *Worker) {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- p.discardWorker(w, p.cfg.MaxJobs)
- return
- }
-
- if err, remove := p.remove.Load(w); remove {
- p.discardWorker(w, err)
- return
+ err = p.ww.AllocateNew(ctx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ p.muw.Lock()
+ p.ww.PushWorker(w)
+ p.muw.Unlock()
}
-
- p.free <- w
+ return rsp, nil
}
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *StaticPool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
- if err != nil {
- return nil, err
- }
-
- p.mul.Lock()
- if p.lsn != nil {
- w.err.Listen(p.lsn)
- }
- p.mul.Unlock()
-
- p.throw(EventWorkerConstruct, w)
-
- p.muw.Lock()
- p.workers = append(p.workers, w)
- p.muw.Unlock()
-
- go p.watchWorker(w)
- return w, nil
+// Destroy all underlying stack (but let them to complete the task).
+func (p *StaticPool) Destroy(ctx context.Context) {
+ p.ww.Destroy(ctx)
}
-// gentry remove worker
-func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
- w.markInvalid()
- go p.destroyWorker(w, caused)
+func (p *StaticPool) Events() chan PoolEvent {
+ return p.events
}
-// destroyWorker destroys workers and removes it from the pool.
-func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
- go func() {
- err := w.Stop()
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
- }()
-
- select {
- case <-w.waitDone:
- // worker is dead
- p.throw(EventWorkerDestruct, w)
+// allocate required number of stack
+func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+ var workers []WorkerBase
- case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process in given time
- if err := w.Kill(); err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ // constant number of stack simplify logic
+ for i := int64(0); i < numWorkers; i++ {
+ ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout)
+ w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
+ if err != nil {
+ cancel()
+ return nil, err
}
-
- p.throw(EventWorkerKill, w)
+ cancel()
+ workers = append(workers, w)
}
+ return workers, nil
}
-// watchWorker watches worker state and replaces it if worker fails.
-func (p *StaticPool) watchWorker(w *Worker) {
- err := w.Wait()
- p.throw(EventWorkerDead, w)
-
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = append(p.workers[:i], p.workers[i+1:]...)
- p.remove.Delete(w)
- break
- }
- }
- p.muw.Unlock()
-
- // registering a dead worker
- atomic.AddInt64(&p.numDead, 1)
-
- // worker have died unexpectedly, pool should attempt to replace it with alive version safely
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
-
- if !p.destroyed() {
- nw, err := p.createWorker()
- if err == nil {
- p.free <- nw
- return
- }
-
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- if len(p.Workers()) == 0 {
- p.throw(EventPoolError, err)
- } else {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(ctx)
+ if err != nil {
+ return err
}
}
-}
-
-func (p *StaticPool) destroyed() bool {
- return atomic.LoadInt32(&p.inDestroy) != 0
-}
-
-// throw invokes event handler if any.
-func (p *StaticPool) throw(event int, ctx interface{}) {
- p.mul.Lock()
- if p.lsn != nil {
- p.lsn(event, ctx)
- }
- p.mul.Unlock()
+ return nil
}