summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go18
1 files changed, 11 insertions, 7 deletions
diff --git a/static_pool.go b/static_pool.go
index c4b6f42d..efd9125a 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -1,7 +1,6 @@
package roadrunner
import (
- "fmt"
"os/exec"
"sync"
"sync/atomic"
@@ -128,6 +127,8 @@ func (p *StaticPool) Remove(w *Worker, err error) bool {
return true
}
+var ErrAllocateWorker = errors.New("unable to allocate worker")
+
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tmu.Lock()
@@ -138,11 +139,10 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
w, err := p.allocateWorker()
if err != nil {
- return nil, errors.Wrap(err, "unable to allocate worker")
+ return nil, ErrAllocateWorker
}
rsp, err = w.Exec(rqs)
-
if err != nil {
// soft job errors are allowed
if _, jobError := err.(JobError); jobError {
@@ -186,6 +186,10 @@ func (p *StaticPool) Destroy() {
wg.Wait()
}
+var ErrPoolStopped = errors.New("pool has been stopped")
+var ErrWorkerAllocateTimeout = errors.New("worker allocate timeout")
+var ErrAllWorkersAreDead = errors.New("all workers are dead")
+
// finds free worker in a given time interval. Skips dead workers.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// TODO loop counts upward, but its variable is bounded downward.
@@ -210,7 +214,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
return w, nil
case <-p.destroy:
- return nil, fmt.Errorf("pool has been stopped")
+ return nil, ErrPoolStopped
default:
// enable timeout handler
}
@@ -218,7 +222,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
timeout := time.NewTimer(p.cfg.AllocateTimeout)
select {
case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ return nil, ErrWorkerAllocateTimeout
case w = <-p.free:
timeout.Stop()
@@ -239,11 +243,11 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
case <-p.destroy:
timeout.Stop()
- return nil, fmt.Errorf("pool has been stopped")
+ return nil, ErrPoolStopped
}
}
- return nil, fmt.Errorf("all workers are dead (%v)", p.cfg.NumWorkers)
+ return nil, ErrAllWorkersAreDead
}
// release releases or replaces the worker.