summaryrefslogtreecommitdiff
path: root/pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
committerWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
commit78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch)
tree8882b9a051bcc9c42328df583c0bb8c39a89591e /pool.go
parentfa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff)
API update
Diffstat (limited to 'pool.go')
-rw-r--r--pool.go231
1 files changed, 131 insertions, 100 deletions
diff --git a/pool.go b/pool.go
index 50f14e4e..75708175 100644
--- a/pool.go
+++ b/pool.go
@@ -2,103 +2,89 @@ package roadrunner
import (
"fmt"
+ "log"
"os/exec"
"sync"
- "sync/atomic"
"time"
+ "github.com/pkg/errors"
)
const (
- // ContextTerminate must be sent by worker in control payload if worker want to die.
- ContextTerminate = "TERMINATE"
+ // Control header to be made by worker to request termination.
+ TerminateRequest = "{\"terminate\": true}"
)
// Pool controls worker creation, destruction and task routing.
type Pool struct {
- cfg Config // pool behaviour
- cmd func() *exec.Cmd // worker command creator
- factory Factory // creates and connects to workers
- numWorkers uint64 // current number of tasks workers
- tasks sync.WaitGroup // counts all tasks executions
- mua sync.Mutex // protects worker allocation
- muw sync.RWMutex // protects state of worker list
- workers []*Worker // all registered workers
- free chan *Worker // freed workers
+ // pool behaviour
+ cfg Config
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to workers
+ factory Factory
+
+ // active task executions
+ tasks sync.WaitGroup
+
+ // workers circular allocation buffer
+ free chan *Worker
+
+ // protects state of worker list, does not affect allocation
+ muw sync.RWMutex
+
+ // all registered workers
+ workers []*Worker
}
// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
+ if err := cfg.Valid(); err != nil {
+ return nil, errors.Wrap(err, "config error")
+ }
+
p := &Pool{
cfg: cfg,
cmd: cmd,
factory: factory,
- workers: make([]*Worker, 0, cfg.MaxWorkers),
- free: make(chan *Worker, cfg.MaxWorkers),
+ workers: make([]*Worker, 0, cfg.NumWorkers),
+ free: make(chan *Worker, cfg.NumWorkers),
}
- // to test if worker ready
- w, err := p.createWorker()
- if err != nil {
- return nil, err
- }
+ //todo: watch for error from workers!!!
- p.free <- w
- return p, nil
-}
+ // constant number of workers simplify logic
+ for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ // to test if worker ready
+ w, err := p.createWorker()
-// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
-// being destroyed.
-func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
- p.tasks.Add(1)
- defer p.tasks.Done()
+ if err != nil {
+ p.Destroy()
- w, err := p.allocateWorker()
- if err != nil {
- return nil, nil, err
- }
-
- if resp, rCtx, err = w.Execute(payload, ctx); err != nil {
- if !p.cfg.DestroyOnError {
- if err, jobError := err.(JobError); jobError {
- p.free <- w
- return nil, nil, err
- }
+ return nil, err
}
- // worker level error
- p.destroyWorker(w)
-
- return nil, nil, err
- }
-
- // controlled destruction
- if len(resp) == 0 && string(rCtx) == ContextTerminate {
- p.destroyWorker(w)
- go func() {
- //immediate refill
- if w, err := p.createWorker(); err != nil {
- p.free <- w
+ // worker watcher
+ go func(w *Worker) {
+ if err := w.Wait(); err != nil {
+ // todo: register error
+ log.Println(err)
}
- }()
-
- return p.Execute(payload, ctx)
- }
+ }(w)
- if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions {
- p.destroyWorker(w)
- } else {
p.free <- w
}
- return resp, rCtx, nil
+ return p, nil
}
-// Config returns associated pool configuration.
+// Config returns associated pool configuration. Immutable.
func (p *Pool) Config() Config {
return p.cfg
}
-// Workers returns workers associated with the pool.
+// Workers returns worker list associated with the pool.
func (p *Pool) Workers() (workers []*Worker) {
p.muw.RLock()
defer p.muw.RUnlock()
@@ -110,8 +96,46 @@ func (p *Pool) Workers() (workers []*Worker) {
return workers
}
-// Close all underlying workers (but let them to complete the task).
-func (p *Pool) Close() {
+// Exec one task with given payload and context, returns result or error.
+func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) {
+ p.tasks.Add(1)
+ defer p.tasks.Done()
+
+ w, err := p.allocateWorker()
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to allocate worker")
+ }
+
+ rsp, err = w.Exec(rqs)
+
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(JobError); jobError {
+ p.free <- w
+ return nil, err
+ }
+
+ go p.replaceWorker(w, err)
+ return nil, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Head != nil && string(rsp.Head) == TerminateRequest {
+ go p.replaceWorker(w, err)
+ return p.Exec(rqs)
+ }
+
+ if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
+ go p.replaceWorker(w, p.cfg.MaxExecutions)
+ } else {
+ p.free <- w
+ }
+
+ return rsp, nil
+}
+
+// Destroy all underlying workers (but let them to complete the task).
+func (p *Pool) Destroy() {
p.tasks.Wait()
var wg sync.WaitGroup
@@ -119,6 +143,7 @@ func (p *Pool) Close() {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
+
p.destroyWorker(w)
}(w)
}
@@ -127,63 +152,69 @@ func (p *Pool) Close() {
}
// finds free worker in a given time interval or creates new if allowed.
-func (p *Pool) allocateWorker() (*Worker, error) {
- p.mua.Lock()
- defer p.mua.Unlock()
-
+func (p *Pool) allocateWorker() (w *Worker, err error) {
select {
- case w := <-p.free:
- // we already have free worker
+ case w = <-p.free:
return w, nil
default:
- if p.numWorkers < p.cfg.MaxWorkers {
- return p.createWorker()
- }
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
- }
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w := <-p.free:
+ timeout.Stop()
+ return w, nil
}
}
+// replaces dead or expired worker with new instance
+func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
+ go p.destroyWorker(w)
+
+ nw, _ := p.createWorker()
+ p.free <- nw
+}
+
// destroy and remove worker from the pool.
func (p *Pool) destroyWorker(w *Worker) {
- atomic.AddUint64(&p.numWorkers, ^uint64(0))
-
- go func() {
- w.Stop()
+ // detaching
+ p.muw.Lock()
+ for i, wc := range p.workers {
+ if wc == w {
+ p.workers = p.workers[:i+1]
+ break
+ }
+ }
+ p.muw.Unlock()
- p.muw.Lock()
- defer p.muw.Unlock()
+ go w.Stop()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = p.workers[:i+1]
- break
- }
+ select {
+ case <-w.waitDone:
+ // worker is dead
+ case <-time.NewTimer(time.Second * 10).C:
+ // failed to stop process
+ if err := w.Kill(); err != nil {
+ //todo: can't kill or already killed?
}
- }()
+ }
}
-// creates new worker (must be called in a locked state).
+// creates new worker using associated factory. automatically
+// adds worker to the worker list (background)
func (p *Pool) createWorker() (*Worker, error) {
- w, err := p.factory.NewWorker(p.cmd())
+ w, err := p.factory.SpawnWorker(p.cmd())
if err != nil {
return nil, err
}
- atomic.AddUint64(&p.numWorkers, 1)
+ p.muw.Lock()
+ defer p.muw.Unlock()
- go func() {
- p.muw.Lock()
- defer p.muw.Unlock()
- p.workers = append(p.workers, w)
- }()
+ p.workers = append(p.workers, w)
return w, nil
}