summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-04-15 12:48:53 +0300
committerGitHub <[email protected]>2018-04-15 12:48:53 +0300
commit16fdc1e896278bf64550d508063d4cdf77409941 (patch)
tree76db075971f2fe683ed3bf643bbf097b47178a5a
parentca63ec7aec2d7ba5d107d54091489612e4b12dc5 (diff)
parente2c1c66d7a35a9c60a4f05126691242068ebb589 (diff)
Merge pull request #6 from spiral/develop
Develop
-rw-r--r--pool.go6
-rw-r--r--state.go3
-rw-r--r--static_pool.go8
-rw-r--r--static_pool_test.go2
4 files changed, 14 insertions, 5 deletions
diff --git a/pool.go b/pool.go
index 1a134a6a..d22830a6 100644
--- a/pool.go
+++ b/pool.go
@@ -13,9 +13,15 @@ const (
// Pool managed set of inner worker processes.
type Pool interface {
+ // Report attaches pool event watcher.
+ Watch(o func(event int, w *Worker, ctx interface{}))
+
// Exec one task with given payload and context, returns result or error.
Exec(rqs *Payload) (rsp *Payload, err error)
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []*Worker)
+
// Destroy all underlying workers (but let them to complete the task).
Destroy()
}
diff --git a/state.go b/state.go
index 6e49a397..d1068ab3 100644
--- a/state.go
+++ b/state.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -8,6 +9,8 @@ import (
// State represents worker status and updated time.
type State interface {
+ fmt.Stringer
+
// Value returns state value
Value() int64
diff --git a/static_pool.go b/static_pool.go
index b0f50c6f..c4895bf0 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -70,8 +70,8 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Observe attaches pool event watcher.
-func (p *StaticPool) Observe(o func(event int, w *Worker, ctx interface{})) {
+// Report attaches pool event watcher.
+func (p *StaticPool) Report(o func(event int, w *Worker, ctx interface{})) {
p.observer = o
}
@@ -166,7 +166,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
}
-// replaces dead or expired worker with new instance
+// replaceWorker replaces dead or expired worker with new instance.
func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
@@ -182,7 +182,7 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
}
}
-// destroy and remove worker from the pool.
+// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker) {
p.throw(EventDestruct, w, nil)
diff --git a/static_pool_test.go b/static_pool_test.go
index 6e6aa4be..b4069b98 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -150,7 +150,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- p.Observe(func(e int, w *Worker, ctx interface{}) {
+ p.Report(func(e int, w *Worker, ctx interface{}) {
if err, ok := ctx.(error); ok {
assert.Contains(t, err.Error(), "undefined_function()")
}