summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-02 17:30:43 +0300
committerWolfy-J <[email protected]>2019-05-02 17:30:43 +0300
commit8fc464213c471ad79be88d9926d8f45b66a23ec0 (patch)
tree4072167f6ca3a2d4bd9ed56ff9c017c8ed79e16a
parenta482d49122d92dd1cd82881ed221c55e5210e536 (diff)
removing workers in runtime
-rw-r--r--pool.go7
-rw-r--r--static_pool.go61
-rw-r--r--watcher.go14
3 files changed, 54 insertions, 28 deletions
diff --git a/pool.go b/pool.go
index 8372b4a6..2e59e154 100644
--- a/pool.go
+++ b/pool.go
@@ -22,10 +22,6 @@ const (
// Pool managed set of inner worker processes.
type Pool interface {
- // Watch enables worker watching (to destroy expired workers or workers which experience
- // memory leaks).
- Watch(w Watcher)
-
// Listen all caused events to attached watcher.
Listen(l func(event int, ctx interface{}))
@@ -35,6 +31,9 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []*Worker)
+ // Remove forces pool to destroy specific worker.
+ Remove(w *Worker, err error)
+
// Destroy all underlying workers (but let them to complete the task).
Destroy()
}
diff --git a/static_pool.go b/static_pool.go
index 69edfd35..54e0caac 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -41,14 +41,17 @@ type StaticPool struct {
// all registered workers
workers []*Worker
+ // invalid declares set of workers to be removed from the pool.
+ mur sync.Mutex
+ remove map[*Worker]error
+
// pool is being destroyed
inDestroy int32
destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- watcher Watcher
- lsn func(event int, ctx interface{})
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -62,6 +65,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
cmd: cmd,
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
+ remove: make(map[*Worker]error),
free: make(chan *Worker, cfg.NumWorkers),
destroy: make(chan interface{}),
}
@@ -81,15 +85,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Watch enables worker watching (to destroy expired workers or workers which experience
-// memory leaks).
-func (p *StaticPool) Watch(w Watcher) {
- p.mul.Lock()
- defer p.mul.Unlock()
-
- p.watcher = w
-}
-
// Listen attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.mul.Lock()
@@ -121,6 +116,13 @@ func (p *StaticPool) Workers() (workers []*Worker) {
return workers
}
+// Remove forces pool to destroy specific worker.
+func (p *StaticPool) Remove(w *Worker, err error) {
+ p.mur.Lock()
+ p.remove[w] = err
+ p.mur.Unlock()
+}
+
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tmu.Lock()
@@ -191,12 +193,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if p.watcher != nil {
- if keep, err := p.watcher.Keep(p, w); !keep {
- i++
- w.markDestroying()
- go p.destroyWorker(w, err)
- }
+ if remove, err := p.removeWorker(w); remove {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
}
return w, nil
@@ -218,12 +218,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if p.watcher != nil {
- if keep, err := p.watcher.Keep(p, w); !keep {
- i++
- w.markDestroying()
- go p.destroyWorker(w, err)
- }
+ if remove, err := p.removeWorker(w); remove {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
}
return w, nil
@@ -242,6 +240,11 @@ func (p *StaticPool) release(w *Worker) {
return
}
+ if remove, err := p.removeWorker(w); remove {
+ go p.destroyWorker(w, err)
+ return
+ }
+
p.free <- w
}
@@ -298,6 +301,9 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ p.mur.Lock()
+ delete(p.remove, w)
+ p.mur.Unlock()
break
}
}
@@ -327,6 +333,13 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
}
+func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) {
+ p.mur.Lock()
+ err, removed = p.remove[w]
+ p.mur.Unlock()
+ return
+}
+
func (p *StaticPool) destroyed() bool {
return atomic.LoadInt32(&p.inDestroy) != 0
}
diff --git a/watcher.go b/watcher.go
index bbc7b9dc..05b8a659 100644
--- a/watcher.go
+++ b/watcher.go
@@ -1,8 +1,22 @@
package roadrunner
+import (
+ "sync"
+ "time"
+)
+
// Watcher watches for workers.
type Watcher interface {
// Keep must return true and nil if worker is OK to continue working,
// must return false and optional error to force worker destruction.
Keep(p Pool, w *Worker) (keep bool, err error)
}
+
+// disconnect??
+type LazyWatcher struct {
+ // defines how often
+ interval time.Duration
+
+ mu sync.Mutex
+ p Pool
+}