diff options
-rw-r--r-- | pool.go | 6 | ||||
-rw-r--r-- | state.go | 3 | ||||
-rw-r--r-- | static_pool.go | 8 | ||||
-rw-r--r-- | static_pool_test.go | 2 |
4 files changed, 14 insertions, 5 deletions
@@ -13,9 +13,15 @@ const ( // Pool managed set of inner worker processes. type Pool interface { + // Report attaches pool event watcher. + Watch(o func(event int, w *Worker, ctx interface{})) + // Exec one task with given payload and context, returns result or error. Exec(rqs *Payload) (rsp *Payload, err error) + // Workers returns worker list associated with the pool. + Workers() (workers []*Worker) + // Destroy all underlying workers (but let them to complete the task). Destroy() } @@ -1,6 +1,7 @@ package roadrunner import ( + "fmt" "sync" "sync/atomic" "time" @@ -8,6 +9,8 @@ import ( // State represents worker status and updated time. type State interface { + fmt.Stringer + // Value returns state value Value() int64 diff --git a/static_pool.go b/static_pool.go index b0f50c6f..c4895bf0 100644 --- a/static_pool.go +++ b/static_pool.go @@ -70,8 +70,8 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Observe attaches pool event watcher. -func (p *StaticPool) Observe(o func(event int, w *Worker, ctx interface{})) { +// Report attaches pool event watcher. +func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) { p.observer = o } @@ -166,7 +166,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } } -// replaces dead or expired worker with new instance +// replaceWorker replaces dead or expired worker with new instance. func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) @@ -182,7 +182,7 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { } } -// destroy and remove worker from the pool. +// destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker) { p.throw(EventDestruct, w, nil) diff --git a/static_pool_test.go b/static_pool_test.go index 6e6aa4be..b4069b98 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Observe(func(e int, w *Worker, ctx interface{}) { + p.Report(func(e int, w *Worker, ctx interface{}) { if err, ok := ctx.(error); ok { assert.Contains(t, err.Error(), "undefined_function()") } |