diff options
-rw-r--r-- | error_buffer.go | 6 | ||||
-rw-r--r-- | server.go | 3 | ||||
-rw-r--r-- | state.go | 3 | ||||
-rw-r--r-- | static_pool.go | 44 | ||||
-rw-r--r-- | worker.go | 15 |
5 files changed, 24 insertions, 47 deletions
diff --git a/error_buffer.go b/error_buffer.go index fec789a9..becd8295 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -73,9 +73,8 @@ func newErrBuffer() *errBuffer { // Listen attaches error stream even listener. func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { eb.mu.Lock() - defer eb.mu.Unlock() - eb.lsn = l + eb.mu.Unlock() } // Len returns the number of buf of the unread portion of the errBuffer; @@ -92,10 +91,9 @@ func (eb *errBuffer) Len() int { // needed. The return value n is the length of p; err is always nil. func (eb *errBuffer) Write(p []byte) (int, error) { eb.mu.Lock() - defer eb.mu.Unlock() - eb.buf = append(eb.buf, p...) eb.update <- nil + eb.mu.Unlock() return len(p), nil } @@ -203,9 +203,8 @@ func (s *Server) poolListener(event int, ctx interface{}) { // throw invokes event handler if any. func (s *Server) throw(event int, ctx interface{}) { s.mul.Lock() - defer s.mul.Unlock() - if s.lsn != nil { s.lsn(event, ctx) } + s.mul.Unlock() } @@ -26,9 +26,6 @@ const ( // StateWorking - working on given payload. StateWorking - // StateDestroying - worker has been marked as being destroyed. - StateDestroying - // StateStreaming - indicates that worker is streaming the data at the moment. StateStreaming diff --git a/static_pool.go b/static_pool.go index c88548ed..bd30afbd 100644 --- a/static_pool.go +++ b/static_pool.go @@ -42,8 +42,7 @@ type StaticPool struct { workers []*Worker // invalid declares set of workers to be removed from the pool. - mur sync.Mutex - remove map[*Worker]error + remove sync.Map // pool is being destroyed inDestroy int32 @@ -65,7 +64,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er cmd: cmd, factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), - remove: make(map[*Worker]error), free: make(chan *Worker, cfg.NumWorkers), destroy: make(chan interface{}), } @@ -118,9 +116,7 @@ func (p *StaticPool) Workers() (workers []*Worker) { // Remove forces pool to destroy specific worker. func (p *StaticPool) Remove(w *Worker, err error) { - p.mur.Lock() - p.remove[w] = err - p.mur.Unlock() + p.remove.Store(w, err) } // Exec one task with given payload and context, returns result or error. @@ -193,12 +189,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if remove, err := p.removeWorker(w); remove { - i++ - atomic.AddInt64(&p.numDead, 1) - - w.markDestroying() + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) + + // get next worker + i++ + continue } return w, nil @@ -220,12 +216,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if remove, err := p.removeWorker(w); remove { - i++ - atomic.AddInt64(&p.numDead, 1) - - w.markDestroying() + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) + + // get next worker + i++ + continue } return w, nil @@ -244,7 +240,7 @@ func (p *StaticPool) release(w *Worker) { return } - if remove, err := p.removeWorker(w); remove { + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) return } @@ -305,9 +301,7 @@ func (p *StaticPool) watchWorker(w *Worker) { for i, wc := range p.workers { if wc == w { p.workers = append(p.workers[:i], p.workers[i+1:]...) - p.mur.Lock() - delete(p.remove, w) - p.mur.Unlock() + p.remove.Delete(w) break } } @@ -337,13 +331,6 @@ func (p *StaticPool) watchWorker(w *Worker) { } } -func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) { - p.mur.Lock() - err, removed = p.remove[w] - p.mur.Unlock() - return -} - func (p *StaticPool) destroyed() bool { return atomic.LoadInt32(&p.inDestroy) != 0 } @@ -351,9 +338,8 @@ func (p *StaticPool) destroyed() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { p.mul.Lock() - defer p.mul.Unlock() - if p.lsn != nil { p.lsn(event, ctx) } + p.mul.Unlock() } @@ -164,38 +164,35 @@ func (w *Worker) Kill() error { // errors. Method might return JobError indicating issue with payload. func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { w.mu.Lock() - defer w.mu.Unlock() if rqs == nil { + w.mu.Unlock() return nil, fmt.Errorf("payload can not be empty") } if w.state.Value() != StateReady { + w.mu.Unlock() return nil, fmt.Errorf("worker is not ready (%s)", w.state.String()) } w.state.set(StateWorking) - defer w.state.registerExec() rsp, err = w.execPayload(rqs) if err != nil { if _, ok := err.(JobError); !ok { w.state.set(StateErrored) + w.state.registerExec() + w.mu.Unlock() return nil, err } } - // todo: attach when payload is complete - // todo: new status - w.state.set(StateReady) + w.state.registerExec() + w.mu.Unlock() return rsp, err } -func (w *Worker) markDestroying() { - w.state.set(StateDestroying) -} - func (w *Worker) start() error { if err := w.cmd.Start(); err != nil { close(w.waitDone) |