summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go154
1 files changed, 71 insertions, 83 deletions
diff --git a/static_pool.go b/static_pool.go
index b043b022..a972b04a 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,6 +6,7 @@ import (
"os/exec"
"sync"
"time"
+ "sync/atomic"
)
const (
@@ -21,8 +22,8 @@ type StaticPool struct {
// worker command creator
cmd func() *exec.Cmd
- // listener is optional callback to handle worker create/destruct/error events.
- listener func(event int, ctx interface{})
+ // lsn is optional callback to handle worker create/destruct/error events.
+ lsn func(event int, ctx interface{})
// creates and connects to workers
factory Factory
@@ -38,6 +39,9 @@ type StaticPool struct {
// all registered workers
workers []*Worker
+
+ // pool is being destroying
+ inDestroy int32
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -71,7 +75,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
// AddListener attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
- p.listener = l
+ p.lsn = l
}
// Config returns associated pool configuration. Immutable.
@@ -110,13 +114,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return nil, err
}
- go p.replaceWorker(w, err)
+ go p.destroyWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.replaceWorker(w, err)
+ go p.destroyWorker(w, err)
return p.Exec(rqs)
}
@@ -126,14 +130,18 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
// Destroy all underlying workers (but let them to complete the task).
func (p *StaticPool) Destroy() {
+ atomic.AddInt32(&p.inDestroy, 1)
+ defer atomic.AddInt32(&p.inDestroy, -1)
+
p.tasks.Wait()
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
+ go w.Stop()
go func(w *Worker) {
defer wg.Done()
- p.destroyWorker(w)
+ p.destroyWorker(w, nil)
}(w)
}
@@ -143,33 +151,19 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// this loop is required to skip issues with dead workers still being in a ring.
- for i := int64(0); i < p.cfg.NumWorkers; i++ {
- select {
- case w = <-p.free:
- if w.state.Value() == StateReady {
- return w, nil
- } else {
- go p.replaceWorker(w, w.state.Value())
- continue
- }
- default:
- // enable timeout handler
- }
+ select {
+ case w = <-p.free:
+ return w, nil
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
-
- if w.state.Value() == StateReady {
- return w, nil
- } else {
- go p.replaceWorker(w, w.state.Value())
- continue
- }
- }
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w = <-p.free:
+ timeout.Stop()
}
return w, nil
@@ -178,32 +172,13 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// release releases or replaces the worker.
func (p *StaticPool) release(w *Worker) {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- go p.replaceWorker(w, p.cfg.MaxJobs)
+ go p.destroyWorker(w, p.cfg.MaxJobs)
return
}
p.free <- w
}
-// replaceWorker replaces dead or expired worker with new instance.
-func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) error {
- go p.destroyWorker(w)
-
- if nw, err := p.createWorker(); err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
-
- if len(p.Workers()) == 0 {
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- p.throw(EventPoolError, fmt.Errorf("all workers are dead"))
- }
- return err
- } else {
- p.free <- nw
- }
-
- return nil
-}
-
// creates new worker using associated factory. automatically
// adds worker to the worker list (background)
func (p *StaticPool) createWorker() (*Worker, error) {
@@ -214,42 +189,16 @@ func (p *StaticPool) createWorker() (*Worker, error) {
p.throw(EventWorkerConstruct, w)
- go func(w *Worker) {
- err := w.Wait()
-
- // worker have died unexpectedly, pool should attempt to replace it with alive version safely
- if w.state.Value() == StateErrored {
- if err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
- }
-
- // attempting to replace worker
- if err := p.replaceWorker(w, err); err != nil {
- p.throw(EventPoolError, fmt.Errorf("unable to replace: %s", err))
- }
- }
- }(w)
-
p.muw.Lock()
- defer p.muw.Unlock()
-
p.workers = append(p.workers, w)
+ p.muw.Unlock()
+ go p.watchWorker(w)
return w, nil
}
// destroyWorker destroys workers and removes it from the pool.
-func (p *StaticPool) destroyWorker(w *Worker) {
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = append(p.workers[:i], p.workers[i+1:]...)
- break
- }
- }
- p.muw.Unlock()
-
+func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
go w.Stop()
select {
@@ -267,9 +216,48 @@ func (p *StaticPool) destroyWorker(w *Worker) {
}
}
+// watchWorker watches worker state and replaces it if worker fails.
+func (p *StaticPool) watchWorker(w *Worker) {
+ err := w.Wait()
+
+ // detaching
+ p.muw.Lock()
+ for i, wc := range p.workers {
+ if wc == w {
+ p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ break
+ }
+ }
+ p.muw.Unlock()
+
+ // worker have died unexpectedly, pool should attempt to replace it with alive version safely
+ if err != nil {
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
+
+ if !p.destroying() {
+ nw, err := p.createWorker()
+ if err == nil {
+ p.free <- nw
+ return
+ }
+
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+
+ // possible situation when major error causes all PHP scripts to die (for example dead DB)
+ if len(p.Workers()) == 0 {
+ p.throw(EventPoolError, fmt.Errorf("unable to replace (last worker): %s", err))
+ }
+ }
+}
+
+func (p *StaticPool) destroying() bool {
+ return atomic.LoadInt32(&p.inDestroy) != 0
+}
+
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
- if p.listener != nil {
- p.listener(event, ctx)
+ if p.lsn != nil {
+ p.lsn(event, ctx)
}
}