diff options
author | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
commit | e229d83dea4bbe9d0cfe6569c8fbe239690aafb9 (patch) | |
tree | 2d4887ffdb167d660b705415f0617458490d0b9f /pool.go |
init
Diffstat (limited to 'pool.go')
-rw-r--r-- | pool.go | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/pool.go b/pool.go new file mode 100644 index 00000000..50f14e4e --- /dev/null +++ b/pool.go @@ -0,0 +1,189 @@ +package roadrunner + +import ( + "fmt" + "os/exec" + "sync" + "sync/atomic" + "time" +) + +const ( + // ContextTerminate must be sent by worker in control payload if worker want to die. + ContextTerminate = "TERMINATE" +) + +// Pool controls worker creation, destruction and task routing. +type Pool struct { + cfg Config // pool behaviour + cmd func() *exec.Cmd // worker command creator + factory Factory // creates and connects to workers + numWorkers uint64 // current number of tasks workers + tasks sync.WaitGroup // counts all tasks executions + mua sync.Mutex // protects worker allocation + muw sync.RWMutex // protects state of worker list + workers []*Worker // all registered workers + free chan *Worker // freed workers +} + +// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. +func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { + p := &Pool{ + cfg: cfg, + cmd: cmd, + factory: factory, + workers: make([]*Worker, 0, cfg.MaxWorkers), + free: make(chan *Worker, cfg.MaxWorkers), + } + + // to test if worker ready + w, err := p.createWorker() + if err != nil { + return nil, err + } + + p.free <- w + return p, nil +} + +// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is +// being destroyed. +func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + p.tasks.Add(1) + defer p.tasks.Done() + + w, err := p.allocateWorker() + if err != nil { + return nil, nil, err + } + + if resp, rCtx, err = w.Execute(payload, ctx); err != nil { + if !p.cfg.DestroyOnError { + if err, jobError := err.(JobError); jobError { + p.free <- w + return nil, nil, err + } + } + + // worker level error + p.destroyWorker(w) + + return nil, nil, err + } + + // controlled destruction + if len(resp) == 0 && string(rCtx) == ContextTerminate { + p.destroyWorker(w) + go func() { + //immediate refill + if w, err := p.createWorker(); err != nil { + p.free <- w + } + }() + + return p.Execute(payload, ctx) + } + + if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions { + p.destroyWorker(w) + } else { + p.free <- w + } + + return resp, rCtx, nil +} + +// Config returns associated pool configuration. +func (p *Pool) Config() Config { + return p.cfg +} + +// Workers returns workers associated with the pool. +func (p *Pool) Workers() (workers []*Worker) { + p.muw.RLock() + defer p.muw.RUnlock() + + for _, w := range p.workers { + workers = append(workers, w) + } + + return workers +} + +// Close all underlying workers (but let them to complete the task). +func (p *Pool) Close() { + p.tasks.Wait() + + var wg sync.WaitGroup + for _, w := range p.Workers() { + wg.Add(1) + go func(w *Worker) { + defer wg.Done() + p.destroyWorker(w) + }(w) + } + + wg.Wait() +} + +// finds free worker in a given time interval or creates new if allowed. +func (p *Pool) allocateWorker() (*Worker, error) { + p.mua.Lock() + defer p.mua.Unlock() + + select { + case w := <-p.free: + // we already have free worker + return w, nil + default: + if p.numWorkers < p.cfg.MaxWorkers { + return p.createWorker() + } + + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + return w, nil + } + } +} + +// destroy and remove worker from the pool. +func (p *Pool) destroyWorker(w *Worker) { + atomic.AddUint64(&p.numWorkers, ^uint64(0)) + + go func() { + w.Stop() + + p.muw.Lock() + defer p.muw.Unlock() + + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + }() +} + +// creates new worker (must be called in a locked state). +func (p *Pool) createWorker() (*Worker, error) { + w, err := p.factory.NewWorker(p.cmd()) + if err != nil { + return nil, err + } + + atomic.AddUint64(&p.numWorkers, 1) + + go func() { + p.muw.Lock() + defer p.muw.Unlock() + p.workers = append(p.workers, w) + }() + + return w, nil +} |