summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-02 18:51:27 +0300
committerWolfy-J <[email protected]>2019-05-02 18:51:27 +0300
commitbb95a9fbd6cc7e8a2d8204417e965dc8e1282128 (patch)
tree46288a4337701a1ee6c7b8734e31cf531cf558fa
parentb3e7bbccdd7636b6ce7d90cf4f295e498feb719c (diff)
base watcher implementation
-rw-r--r--error_buffer.go4
-rw-r--r--state.go5
-rw-r--r--static_pool.go13
-rw-r--r--watcher.go50
4 files changed, 54 insertions, 18 deletions
diff --git a/error_buffer.go b/error_buffer.go
index becd8295..0eaf03b6 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -87,8 +87,8 @@ func (eb *errBuffer) Len() int {
return len(eb.buf)
}
-// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil.
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; err is always nil.
func (eb *errBuffer) Write(p []byte) (int, error) {
eb.mu.Lock()
eb.buf = append(eb.buf, p...)
diff --git a/state.go b/state.go
index 4d8b1eaa..8a065637 100644
--- a/state.go
+++ b/state.go
@@ -26,9 +26,6 @@ const (
// StateWorking - working on given payload.
StateWorking
- // StateStreaming - indicates that worker is streaming the data at the moment.
- StateStreaming
-
// StateStopping - process is being softly stopped.
StateStopping
@@ -57,8 +54,6 @@ func (s *state) String() string {
return "ready"
case StateWorking:
return "working"
- case StateStreaming:
- return "streaming"
case StateStopped:
return "stopped"
case StateErrored:
diff --git a/static_pool.go b/static_pool.go
index bd30afbd..615ab00a 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -116,7 +116,20 @@ func (p *StaticPool) Workers() (workers []*Worker) {
// Remove forces pool to destroy specific worker.
func (p *StaticPool) Remove(w *Worker, err error) {
+ if w.State().Value() != StateReady && w.State().Value() != StateWorking {
+ // unable to remove inactive worker
+ return
+ }
+
p.remove.Store(w, err)
+
+ // cleanup workers which were scheduled for deletion after stop has been started
+ p.remove.Range(func(key, value interface{}) bool {
+ if key.(*Worker).State().Value() == StateStopped || key.(*Worker).State().Value() == StateErrored {
+ p.remove.Delete(key)
+ }
+ return true
+ })
}
// Exec one task with given payload and context, returns result or error.
diff --git a/watcher.go b/watcher.go
index 05b8a659..3b654c34 100644
--- a/watcher.go
+++ b/watcher.go
@@ -1,22 +1,50 @@
package roadrunner
import (
- "sync"
+ "log"
"time"
)
-// Watcher watches for workers.
-type Watcher interface {
- // Keep must return true and nil if worker is OK to continue working,
- // must return false and optional error to force worker destruction.
- Keep(p Pool, w *Worker) (keep bool, err error)
-}
-
// disconnect??
-type LazyWatcher struct {
+type Watcher struct {
// defines how often
interval time.Duration
+ pool Pool
+
+ stop chan interface{}
+}
+
+// NewWatcher creates new pool watcher.
+func NewWatcher(p Pool, i time.Duration) *Watcher {
+ w := &Watcher{
+ interval: i,
+ pool: p,
+ stop: make(chan interface{}),
+ }
+
+ go func() {
+ ticker := time.NewTicker(w.interval)
+
+ for {
+ select {
+ case <-ticker.C:
+ w.update()
+ case <-w.stop:
+ return
+ }
+ }
+ }()
+
+ return w
+}
+
+func (w *Watcher) Stop() {
+ close(w.stop)
+}
+
+func (w *Watcher) update() {
+ for _, w := range w.pool.Workers() {
+ log.Println(w)
- mu sync.Mutex
- p Pool
+ }
}