diff options
author | Wolfy-J <[email protected]> | 2018-05-29 13:03:34 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-05-29 13:03:34 +0300 |
commit | b8bc792b263a3891e125757a35cc563bb85f1a0b (patch) | |
tree | f7a9e6f2568220491a26f6544e4acf0ed62012bb /static_pool.go | |
parent | 50f820833eeef8518b3b978b33c6f20391225162 (diff) |
nested observers
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/static_pool.go b/static_pool.go index c4895bf0..4e75cddc 100644 --- a/static_pool.go +++ b/static_pool.go @@ -22,7 +22,7 @@ type StaticPool struct { cmd func() *exec.Cmd // observer is optional callback to handle worker create/destruct/error events. - observer func(event int, w *Worker, ctx interface{}) + observer func(event int, ctx interface{}) // creates and connects to workers factory Factory @@ -43,7 +43,7 @@ type StaticPool struct { // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config error") + return nil, errors.Wrap(err, "cfg error") } p := &StaticPool{ @@ -70,8 +70,8 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Report attaches pool event watcher. -func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) { +// Observe attaches pool event watcher. +func (p *StaticPool) Observe(o func(event int, ctx interface{})) { p.observer = o } @@ -139,7 +139,6 @@ func (p *StaticPool) Destroy() { wg.Add(1) go func(w *Worker) { defer wg.Done() - p.destroyWorker(w) }(w) } @@ -171,11 +170,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) if nw, err := p.createWorker(); err != nil { - p.throw(EventError, w, err) + p.throw(EventError, WorkerError{Worker: w, Caused: err}) if len(p.Workers()) == 0 { // possible situation when major error causes all PHP scripts to die (for example dead DB) - p.throw(EventError, nil, fmt.Errorf("all workers dead")) + p.throw(EventError, PoolError("all workers are dead")) } } else { p.free <- nw @@ -184,7 +183,7 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker) { - p.throw(EventDestruct, w, nil) + p.throw(EventDestruct, w) // detaching p.muw.Lock() @@ -204,7 +203,7 @@ func (p *StaticPool) destroyWorker(w *Worker) { case <-time.NewTimer(p.cfg.DestroyTimeout).C: // failed to stop process if err := w.Kill(); err != nil { - p.throw(EventError, w, err) + p.throw(EventError, WorkerError{Worker: w, Caused: err}) } } } @@ -217,11 +216,11 @@ func (p *StaticPool) createWorker() (*Worker, error) { return nil, err } - p.throw(EventCreated, w, nil) + p.throw(EventCreated, w) go func(w *Worker) { if err := w.Wait(); err != nil { - p.throw(EventError, w, err) + p.throw(EventError, WorkerError{Worker: w, Caused: err}) } }(w) @@ -234,8 +233,8 @@ func (p *StaticPool) createWorker() (*Worker, error) { } // throw invokes event handler if any. -func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) { +func (p *StaticPool) throw(event int, ctx interface{}) { if p.observer != nil { - p.observer(event, w, ctx) + p.observer(event, ctx) } } |