diff options
author | Wolfy-J <[email protected]> | 2019-05-02 18:51:27 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-02 18:51:27 +0300 |
commit | bb95a9fbd6cc7e8a2d8204417e965dc8e1282128 (patch) | |
tree | 46288a4337701a1ee6c7b8734e31cf531cf558fa | |
parent | b3e7bbccdd7636b6ce7d90cf4f295e498feb719c (diff) |
base watcher implementation
-rw-r--r-- | error_buffer.go | 4 | ||||
-rw-r--r-- | state.go | 5 | ||||
-rw-r--r-- | static_pool.go | 13 | ||||
-rw-r--r-- | watcher.go | 50 |
4 files changed, 54 insertions, 18 deletions
diff --git a/error_buffer.go b/error_buffer.go index becd8295..0eaf03b6 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -87,8 +87,8 @@ func (eb *errBuffer) Len() int { return len(eb.buf) } -// Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; err is always nil. func (eb *errBuffer) Write(p []byte) (int, error) { eb.mu.Lock() eb.buf = append(eb.buf, p...) @@ -26,9 +26,6 @@ const ( // StateWorking - working on given payload. StateWorking - // StateStreaming - indicates that worker is streaming the data at the moment. - StateStreaming - // StateStopping - process is being softly stopped. StateStopping @@ -57,8 +54,6 @@ func (s *state) String() string { return "ready" case StateWorking: return "working" - case StateStreaming: - return "streaming" case StateStopped: return "stopped" case StateErrored: diff --git a/static_pool.go b/static_pool.go index bd30afbd..615ab00a 100644 --- a/static_pool.go +++ b/static_pool.go @@ -116,7 +116,20 @@ func (p *StaticPool) Workers() (workers []*Worker) { // Remove forces pool to destroy specific worker. func (p *StaticPool) Remove(w *Worker, err error) { + if w.State().Value() != StateReady && w.State().Value() != StateWorking { + // unable to remove inactive worker + return + } + p.remove.Store(w, err) + + // cleanup workers which were scheduled for deletion after stop has been started + p.remove.Range(func(key, value interface{}) bool { + if key.(*Worker).State().Value() == StateStopped || key.(*Worker).State().Value() == StateErrored { + p.remove.Delete(key) + } + return true + }) } // Exec one task with given payload and context, returns result or error. @@ -1,22 +1,50 @@ package roadrunner import ( - "sync" + "log" "time" ) -// Watcher watches for workers. -type Watcher interface { - // Keep must return true and nil if worker is OK to continue working, - // must return false and optional error to force worker destruction. - Keep(p Pool, w *Worker) (keep bool, err error) -} - // disconnect?? -type LazyWatcher struct { +type Watcher struct { // defines how often interval time.Duration + pool Pool + + stop chan interface{} +} + +// NewWatcher creates new pool watcher. +func NewWatcher(p Pool, i time.Duration) *Watcher { + w := &Watcher{ + interval: i, + pool: p, + stop: make(chan interface{}), + } + + go func() { + ticker := time.NewTicker(w.interval) + + for { + select { + case <-ticker.C: + w.update() + case <-w.stop: + return + } + } + }() + + return w +} + +func (w *Watcher) Stop() { + close(w.stop) +} + +func (w *Watcher) update() { + for _, w := range w.pool.Workers() { + log.Println(w) - mu sync.Mutex - p Pool + } } |