summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pool.go42
-rw-r--r--pool_test.go8
-rw-r--r--state.go2
3 files changed, 39 insertions, 13 deletions
diff --git a/pool.go b/pool.go
index c2226420..deb24fd7 100644
--- a/pool.go
+++ b/pool.go
@@ -3,7 +3,6 @@ package roadrunner
import (
"fmt"
"github.com/pkg/errors"
- "log"
"os/exec"
"sync"
"time"
@@ -14,8 +13,22 @@ const (
StopRequest = "{\"stop\":true}"
)
+const (
+ // EventCreated thrown when new worker is spawned.
+ EventCreated = iota
+
+ // EventDestruct thrown before worker destruction.
+ EventDestruct
+
+ // EventError thrown any worker related even happen (error passed as context)
+ EventError
+)
+
// Pool controls worker creation, destruction and task routing.
type Pool struct {
+ // EventHandler is optional callback to handle worker create/destruct/error events.
+ EventHandler func(event int, w *Worker, ctx interface{})
+
// pool behaviour
cfg Config
@@ -163,14 +176,17 @@ func (p *Pool) allocateWorker() (w *Worker, err error) {
func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
- nw, _ := p.createWorker()
-
- // if unable to create, retry ? or report error
- p.free <- nw
+ if nw, err := p.createWorker(); err != nil {
+ p.throw(EventError, w, err)
+ } else {
+ p.free <- nw
+ }
}
// destroy and remove worker from the pool.
func (p *Pool) destroyWorker(w *Worker) {
+ p.throw(EventDestruct, w, nil)
+
// detaching
p.muw.Lock()
for i, wc := range p.workers {
@@ -189,7 +205,7 @@ func (p *Pool) destroyWorker(w *Worker) {
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
// failed to stop process
if err := w.Kill(); err != nil {
- // todo: handle error
+ p.throw(EventError, w, err)
}
}
}
@@ -202,12 +218,11 @@ func (p *Pool) createWorker() (*Worker, error) {
return nil, err
}
+ p.throw(EventCreated, w, nil)
+
go func(w *Worker) {
if err := w.Wait(); err != nil {
- // todo: register error
-
- log.Println(err)
-
+ p.throw(EventError, w, err)
}
}(w)
@@ -218,3 +233,10 @@ func (p *Pool) createWorker() (*Worker, error) {
return w, nil
}
+
+// throw invokes event handler if any.
+func (p *Pool) throw(event int, w *Worker, ctx interface{}) {
+ if p.EventHandler != nil {
+ p.EventHandler(event, w, ctx)
+ }
+}
diff --git a/pool_test.go b/pool_test.go
index 8b936059..db38cbc4 100644
--- a/pool_test.go
+++ b/pool_test.go
@@ -149,13 +149,17 @@ func Test_Pool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
+
+ p.EventHandler = func(e int, w *Worker, ctx interface{}) {
+ if err, ok := ctx.(error); ok {
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }
+ }
res, err := p.Exec(&Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res)
-
- // todo: handle error in even log
}
func Test_Pool_AllocateTimeout(t *testing.T) {
diff --git a/state.go b/state.go
index 2918f396..6e49a397 100644
--- a/state.go
+++ b/state.go
@@ -9,7 +9,7 @@ import (
// State represents worker status and updated time.
type State interface {
// Value returns state value
- Value() int64 //todo: change to state value
+ Value() int64
// NumExecs shows how many times worker was invoked
NumExecs() uint64