summaryrefslogtreecommitdiff
path: root/pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-28 18:57:25 +0300
committerWolfy-J <[email protected]>2018-01-28 18:57:25 +0300
commit71a040f5ee98d91e9f388b0d4509adf8d5589bb9 (patch)
tree0af48d1887b8cbba00277cfcb61ac9ebdd9661e8 /pool.go
parente065b650ad2d3b0c58fe73fa75c9f730a36eee86 (diff)
Context
Diffstat (limited to 'pool.go')
-rw-r--r--pool.go42
1 files changed, 32 insertions, 10 deletions
diff --git a/pool.go b/pool.go
index c2226420..deb24fd7 100644
--- a/pool.go
+++ b/pool.go
@@ -3,7 +3,6 @@ package roadrunner
import (
"fmt"
"github.com/pkg/errors"
- "log"
"os/exec"
"sync"
"time"
@@ -14,8 +13,22 @@ const (
StopRequest = "{\"stop\":true}"
)
+const (
+ // EventCreated thrown when new worker is spawned.
+ EventCreated = iota
+
+ // EventDestruct thrown before worker destruction.
+ EventDestruct
+
+ // EventError thrown any worker related even happen (error passed as context)
+ EventError
+)
+
// Pool controls worker creation, destruction and task routing.
type Pool struct {
+ // EventHandler is optional callback to handle worker create/destruct/error events.
+ EventHandler func(event int, w *Worker, ctx interface{})
+
// pool behaviour
cfg Config
@@ -163,14 +176,17 @@ func (p *Pool) allocateWorker() (w *Worker, err error) {
func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
- nw, _ := p.createWorker()
-
- // if unable to create, retry ? or report error
- p.free <- nw
+ if nw, err := p.createWorker(); err != nil {
+ p.throw(EventError, w, err)
+ } else {
+ p.free <- nw
+ }
}
// destroy and remove worker from the pool.
func (p *Pool) destroyWorker(w *Worker) {
+ p.throw(EventDestruct, w, nil)
+
// detaching
p.muw.Lock()
for i, wc := range p.workers {
@@ -189,7 +205,7 @@ func (p *Pool) destroyWorker(w *Worker) {
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
// failed to stop process
if err := w.Kill(); err != nil {
- // todo: handle error
+ p.throw(EventError, w, err)
}
}
}
@@ -202,12 +218,11 @@ func (p *Pool) createWorker() (*Worker, error) {
return nil, err
}
+ p.throw(EventCreated, w, nil)
+
go func(w *Worker) {
if err := w.Wait(); err != nil {
- // todo: register error
-
- log.Println(err)
-
+ p.throw(EventError, w, err)
}
}(w)
@@ -218,3 +233,10 @@ func (p *Pool) createWorker() (*Worker, error) {
return w, nil
}
+
+// throw invokes event handler if any.
+func (p *Pool) throw(event int, w *Worker, ctx interface{}) {
+ if p.EventHandler != nil {
+ p.EventHandler(event, w, ctx)
+ }
+}