diff options
-rw-r--r-- | pool.go | 42 | ||||
-rw-r--r-- | pool_test.go | 8 | ||||
-rw-r--r-- | state.go | 2 |
3 files changed, 39 insertions, 13 deletions
@@ -3,7 +3,6 @@ package roadrunner import ( "fmt" "github.com/pkg/errors" - "log" "os/exec" "sync" "time" @@ -14,8 +13,22 @@ const ( StopRequest = "{\"stop\":true}" ) +const ( + // EventCreated thrown when new worker is spawned. + EventCreated = iota + + // EventDestruct thrown before worker destruction. + EventDestruct + + // EventError thrown any worker related even happen (error passed as context) + EventError +) + // Pool controls worker creation, destruction and task routing. type Pool struct { + // EventHandler is optional callback to handle worker create/destruct/error events. + EventHandler func(event int, w *Worker, ctx interface{}) + // pool behaviour cfg Config @@ -163,14 +176,17 @@ func (p *Pool) allocateWorker() (w *Worker, err error) { func (p *Pool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) - nw, _ := p.createWorker() - - // if unable to create, retry ? or report error - p.free <- nw + if nw, err := p.createWorker(); err != nil { + p.throw(EventError, w, err) + } else { + p.free <- nw + } } // destroy and remove worker from the pool. func (p *Pool) destroyWorker(w *Worker) { + p.throw(EventDestruct, w, nil) + // detaching p.muw.Lock() for i, wc := range p.workers { @@ -189,7 +205,7 @@ func (p *Pool) destroyWorker(w *Worker) { case <-time.NewTimer(p.cfg.DestroyTimeout).C: // failed to stop process if err := w.Kill(); err != nil { - // todo: handle error + p.throw(EventError, w, err) } } } @@ -202,12 +218,11 @@ func (p *Pool) createWorker() (*Worker, error) { return nil, err } + p.throw(EventCreated, w, nil) + go func(w *Worker) { if err := w.Wait(); err != nil { - // todo: register error - - log.Println(err) - + p.throw(EventError, w, err) } }(w) @@ -218,3 +233,10 @@ func (p *Pool) createWorker() (*Worker, error) { return w, nil } + +// throw invokes event handler if any. +func (p *Pool) throw(event int, w *Worker, ctx interface{}) { + if p.EventHandler != nil { + p.EventHandler(event, w, ctx) + } +} diff --git a/pool_test.go b/pool_test.go index 8b936059..db38cbc4 100644 --- a/pool_test.go +++ b/pool_test.go @@ -149,13 +149,17 @@ func Test_Pool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) + + p.EventHandler = func(e int, w *Worker, ctx interface{}) { + if err, ok := ctx.(error); ok { + assert.Contains(t, err.Error(), "undefined_function()") + } + } res, err := p.Exec(&Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) - - // todo: handle error in even log } func Test_Pool_AllocateTimeout(t *testing.T) { @@ -9,7 +9,7 @@ import ( // State represents worker status and updated time. type State interface { // Value returns state value - Value() int64 //todo: change to state value + Value() int64 // NumExecs shows how many times worker was invoked NumExecs() uint64 |