summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-05-29 13:03:34 +0300
committerWolfy-J <[email protected]>2018-05-29 13:03:34 +0300
commitb8bc792b263a3891e125757a35cc563bb85f1a0b (patch)
treef7a9e6f2568220491a26f6544e4acf0ed62012bb /static_pool.go
parent50f820833eeef8518b3b978b33c6f20391225162 (diff)
nested observers
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go25
1 files changed, 12 insertions, 13 deletions
diff --git a/static_pool.go b/static_pool.go
index c4895bf0..4e75cddc 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -22,7 +22,7 @@ type StaticPool struct {
cmd func() *exec.Cmd
// observer is optional callback to handle worker create/destruct/error events.
- observer func(event int, w *Worker, ctx interface{})
+ observer func(event int, ctx interface{})
// creates and connects to workers
factory Factory
@@ -43,7 +43,7 @@ type StaticPool struct {
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config error")
+ return nil, errors.Wrap(err, "cfg error")
}
p := &StaticPool{
@@ -70,8 +70,8 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Report attaches pool event watcher.
-func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) {
+// Observe attaches pool event watcher.
+func (p *StaticPool) Observe(o func(event int, ctx interface{})) {
p.observer = o
}
@@ -139,7 +139,6 @@ func (p *StaticPool) Destroy() {
wg.Add(1)
go func(w *Worker) {
defer wg.Done()
-
p.destroyWorker(w)
}(w)
}
@@ -171,11 +170,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
if nw, err := p.createWorker(); err != nil {
- p.throw(EventError, w, err)
+ p.throw(EventError, WorkerError{Worker: w, Caused: err})
if len(p.Workers()) == 0 {
// possible situation when major error causes all PHP scripts to die (for example dead DB)
- p.throw(EventError, nil, fmt.Errorf("all workers dead"))
+ p.throw(EventError, PoolError("all workers are dead"))
}
} else {
p.free <- nw
@@ -184,7 +183,7 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker) {
- p.throw(EventDestruct, w, nil)
+ p.throw(EventDestruct, w)
// detaching
p.muw.Lock()
@@ -204,7 +203,7 @@ func (p *StaticPool) destroyWorker(w *Worker) {
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
// failed to stop process
if err := w.Kill(); err != nil {
- p.throw(EventError, w, err)
+ p.throw(EventError, WorkerError{Worker: w, Caused: err})
}
}
}
@@ -217,11 +216,11 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return nil, err
}
- p.throw(EventCreated, w, nil)
+ p.throw(EventCreated, w)
go func(w *Worker) {
if err := w.Wait(); err != nil {
- p.throw(EventError, w, err)
+ p.throw(EventError, WorkerError{Worker: w, Caused: err})
}
}(w)
@@ -234,8 +233,8 @@ func (p *StaticPool) createWorker() (*Worker, error) {
}
// throw invokes event handler if any.
-func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) {
+func (p *StaticPool) throw(event int, ctx interface{}) {
if p.observer != nil {
- p.observer(event, w, ctx)
+ p.observer(event, ctx)
}
}