summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-02 18:21:36 +0300
committerWolfy-J <[email protected]>2019-05-02 18:21:36 +0300
commiteb8c64941cbcd30ff79b6147efd5fef42eccb648 (patch)
treebde0ceb7e7236850cfe999da7c3ffecf62b58d00
parent34abca68708ed881c3360ee749d794b0000a3aec (diff)
miiiinor performance optimizations
-rw-r--r--error_buffer.go6
-rw-r--r--server.go3
-rw-r--r--state.go3
-rw-r--r--static_pool.go44
-rw-r--r--worker.go15
5 files changed, 24 insertions, 47 deletions
diff --git a/error_buffer.go b/error_buffer.go
index fec789a9..becd8295 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -73,9 +73,8 @@ func newErrBuffer() *errBuffer {
// Listen attaches error stream even listener.
func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
eb.mu.Lock()
- defer eb.mu.Unlock()
-
eb.lsn = l
+ eb.mu.Unlock()
}
// Len returns the number of buf of the unread portion of the errBuffer;
@@ -92,10 +91,9 @@ func (eb *errBuffer) Len() int {
// needed. The return value n is the length of p; err is always nil.
func (eb *errBuffer) Write(p []byte) (int, error) {
eb.mu.Lock()
- defer eb.mu.Unlock()
-
eb.buf = append(eb.buf, p...)
eb.update <- nil
+ eb.mu.Unlock()
return len(p), nil
}
diff --git a/server.go b/server.go
index 26f58172..2672a792 100644
--- a/server.go
+++ b/server.go
@@ -203,9 +203,8 @@ func (s *Server) poolListener(event int, ctx interface{}) {
// throw invokes event handler if any.
func (s *Server) throw(event int, ctx interface{}) {
s.mul.Lock()
- defer s.mul.Unlock()
-
if s.lsn != nil {
s.lsn(event, ctx)
}
+ s.mul.Unlock()
}
diff --git a/state.go b/state.go
index d3ac2a2f..4d8b1eaa 100644
--- a/state.go
+++ b/state.go
@@ -26,9 +26,6 @@ const (
// StateWorking - working on given payload.
StateWorking
- // StateDestroying - worker has been marked as being destroyed.
- StateDestroying
-
// StateStreaming - indicates that worker is streaming the data at the moment.
StateStreaming
diff --git a/static_pool.go b/static_pool.go
index c88548ed..bd30afbd 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,8 +42,7 @@ type StaticPool struct {
workers []*Worker
// invalid declares set of workers to be removed from the pool.
- mur sync.Mutex
- remove map[*Worker]error
+ remove sync.Map
// pool is being destroyed
inDestroy int32
@@ -65,7 +64,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
cmd: cmd,
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
- remove: make(map[*Worker]error),
free: make(chan *Worker, cfg.NumWorkers),
destroy: make(chan interface{}),
}
@@ -118,9 +116,7 @@ func (p *StaticPool) Workers() (workers []*Worker) {
// Remove forces pool to destroy specific worker.
func (p *StaticPool) Remove(w *Worker, err error) {
- p.mur.Lock()
- p.remove[w] = err
- p.mur.Unlock()
+ p.remove.Store(w, err)
}
// Exec one task with given payload and context, returns result or error.
@@ -193,12 +189,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if remove, err := p.removeWorker(w); remove {
- i++
- atomic.AddInt64(&p.numDead, 1)
-
- w.markDestroying()
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
+
+ // get next worker
+ i++
+ continue
}
return w, nil
@@ -220,12 +216,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if remove, err := p.removeWorker(w); remove {
- i++
- atomic.AddInt64(&p.numDead, 1)
-
- w.markDestroying()
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
+
+ // get next worker
+ i++
+ continue
}
return w, nil
@@ -244,7 +240,7 @@ func (p *StaticPool) release(w *Worker) {
return
}
- if remove, err := p.removeWorker(w); remove {
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
return
}
@@ -305,9 +301,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
- p.mur.Lock()
- delete(p.remove, w)
- p.mur.Unlock()
+ p.remove.Delete(w)
break
}
}
@@ -337,13 +331,6 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
}
-func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) {
- p.mur.Lock()
- err, removed = p.remove[w]
- p.mur.Unlock()
- return
-}
-
func (p *StaticPool) destroyed() bool {
return atomic.LoadInt32(&p.inDestroy) != 0
}
@@ -351,9 +338,8 @@ func (p *StaticPool) destroyed() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
p.mul.Lock()
- defer p.mul.Unlock()
-
if p.lsn != nil {
p.lsn(event, ctx)
}
+ p.mul.Unlock()
}
diff --git a/worker.go b/worker.go
index 04b58e49..7221c2de 100644
--- a/worker.go
+++ b/worker.go
@@ -164,38 +164,35 @@ func (w *Worker) Kill() error {
// errors. Method might return JobError indicating issue with payload.
func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
w.mu.Lock()
- defer w.mu.Unlock()
if rqs == nil {
+ w.mu.Unlock()
return nil, fmt.Errorf("payload can not be empty")
}
if w.state.Value() != StateReady {
+ w.mu.Unlock()
return nil, fmt.Errorf("worker is not ready (%s)", w.state.String())
}
w.state.set(StateWorking)
- defer w.state.registerExec()
rsp, err = w.execPayload(rqs)
if err != nil {
if _, ok := err.(JobError); !ok {
w.state.set(StateErrored)
+ w.state.registerExec()
+ w.mu.Unlock()
return nil, err
}
}
- // todo: attach when payload is complete
- // todo: new status
-
w.state.set(StateReady)
+ w.state.registerExec()
+ w.mu.Unlock()
return rsp, err
}
-func (w *Worker) markDestroying() {
- w.state.set(StateDestroying)
-}
-
func (w *Worker) start() error {
if err := w.cmd.Start(); err != nil {
close(w.waitDone)