diff options
-rw-r--r-- | cmd/util/table.go | 2 | ||||
-rw-r--r-- | state.go | 5 | ||||
-rw-r--r-- | static_pool.go | 20 | ||||
-rw-r--r-- | worker.go | 4 |
4 files changed, 25 insertions, 6 deletions
diff --git a/cmd/util/table.go b/cmd/util/table.go index 565c0679..4fa05ae2 100644 --- a/cmd/util/table.go +++ b/cmd/util/table.go @@ -40,6 +40,8 @@ func renderStatus(status string) string { return Sprintf("<cyan>ready</reset>") case "working": return Sprintf("<green>working</reset>") + case "disabled": + return Sprintf("<yellow>disabled</reset>") case "stopped": return Sprintf("<red>stopped</reset>") case "errored": @@ -26,6 +26,9 @@ const ( // StateWorking - working on given payload. StateWorking + // StateDisabled - indicates that worker is being disabled and will be removed. + StateDisabled + // StateStopping - process is being softly stopped. StateStopping @@ -54,6 +57,8 @@ func (s *state) String() string { return "ready" case StateWorking: return "working" + case StateDisabled: + return "disabled" case StateStopped: return "stopped" case StateErrored: diff --git a/static_pool.go b/static_pool.go index 1409fc3f..fa4ac027 100644 --- a/static_pool.go +++ b/static_pool.go @@ -42,6 +42,7 @@ type StaticPool struct { workers []*Worker // invalid declares set of workers to be removed from the pool. + mur sync.Mutex remove sync.Map // pool is being destroyed @@ -146,13 +147,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return nil, err } - go p.destroyWorker(w, err) + p.discardWorker(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.destroyWorker(w, err) + p.discardWorker(w, err) return p.Exec(rqs) } @@ -172,6 +173,7 @@ func (p *StaticPool) Destroy() { var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) + w.markDisabled() go func(w *Worker) { defer wg.Done() p.destroyWorker(w, nil) @@ -195,7 +197,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } if err, remove := p.remove.Load(w); remove { - go p.destroyWorker(w, err) + p.discardWorker(w, err) // get next worker i++ @@ -222,7 +224,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } if err, remove := p.remove.Load(w); remove { - go p.destroyWorker(w, err) + p.discardWorker(w, err) // get next worker i++ @@ -243,12 +245,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { // release releases or replaces the worker. func (p *StaticPool) release(w *Worker) { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - go p.destroyWorker(w, p.cfg.MaxJobs) + p.discardWorker(w, p.cfg.MaxJobs) return } if err, remove := p.remove.Load(w); remove { - go p.destroyWorker(w, err) + p.discardWorker(w, err) return } @@ -279,6 +281,12 @@ func (p *StaticPool) createWorker() (*Worker, error) { return w, nil } +// gentry remove worker +func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { + w.markDisabled() + go p.destroyWorker(w, caused) +} + // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { go w.Stop() @@ -193,6 +193,10 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { return rsp, err } +func (w *Worker) markDisabled() { + w.state.set(StateDisabled) +} + func (w *Worker) start() error { if err := w.cmd.Start(); err != nil { close(w.waitDone) |