summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 13:37:39 +0300
committerWolfy-J <[email protected]>2019-05-03 13:37:39 +0300
commite52c29d081a14f0ec4efb01da11c69215ea40a6a (patch)
tree52a54ed292bfb5e27ef950e598181de0f70d759a
parent46519d2a383c9564007e1d2a6beb492988f8c6c9 (diff)
better worker removal mechanism
-rw-r--r--cmd/util/table.go2
-rw-r--r--state.go5
-rw-r--r--static_pool.go20
-rw-r--r--worker.go4
4 files changed, 25 insertions, 6 deletions
diff --git a/cmd/util/table.go b/cmd/util/table.go
index 565c0679..4fa05ae2 100644
--- a/cmd/util/table.go
+++ b/cmd/util/table.go
@@ -40,6 +40,8 @@ func renderStatus(status string) string {
return Sprintf("<cyan>ready</reset>")
case "working":
return Sprintf("<green>working</reset>")
+ case "disabled":
+ return Sprintf("<yellow>disabled</reset>")
case "stopped":
return Sprintf("<red>stopped</reset>")
case "errored":
diff --git a/state.go b/state.go
index 8a065637..a5bd4397 100644
--- a/state.go
+++ b/state.go
@@ -26,6 +26,9 @@ const (
// StateWorking - working on given payload.
StateWorking
+ // StateDisabled - indicates that worker is being disabled and will be removed.
+ StateDisabled
+
// StateStopping - process is being softly stopped.
StateStopping
@@ -54,6 +57,8 @@ func (s *state) String() string {
return "ready"
case StateWorking:
return "working"
+ case StateDisabled:
+ return "disabled"
case StateStopped:
return "stopped"
case StateErrored:
diff --git a/static_pool.go b/static_pool.go
index 1409fc3f..fa4ac027 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,6 +42,7 @@ type StaticPool struct {
workers []*Worker
// invalid declares set of workers to be removed from the pool.
+ mur sync.Mutex
remove sync.Map
// pool is being destroyed
@@ -146,13 +147,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return nil, err
}
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return p.Exec(rqs)
}
@@ -172,6 +173,7 @@ func (p *StaticPool) Destroy() {
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
+ w.markDisabled()
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w, nil)
@@ -195,7 +197,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
if err, remove := p.remove.Load(w); remove {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
// get next worker
i++
@@ -222,7 +224,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
if err, remove := p.remove.Load(w); remove {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
// get next worker
i++
@@ -243,12 +245,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// release releases or replaces the worker.
func (p *StaticPool) release(w *Worker) {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- go p.destroyWorker(w, p.cfg.MaxJobs)
+ p.discardWorker(w, p.cfg.MaxJobs)
return
}
if err, remove := p.remove.Load(w); remove {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return
}
@@ -279,6 +281,12 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return w, nil
}
+// gentry remove worker
+func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
+ w.markDisabled()
+ go p.destroyWorker(w, caused)
+}
+
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
go w.Stop()
diff --git a/worker.go b/worker.go
index d476b918..fb55eb57 100644
--- a/worker.go
+++ b/worker.go
@@ -193,6 +193,10 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
return rsp, err
}
+func (w *Worker) markDisabled() {
+ w.state.set(StateDisabled)
+}
+
func (w *Worker) start() error {
if err := w.cmd.Start(); err != nil {
close(w.waitDone)