summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xstatic_pool.go229
-rwxr-xr-xstatic_pool_test.go2
-rwxr-xr-xsupervisor_pool.go4
-rwxr-xr-xutil/events.go10
-rwxr-xr-xworker_watcher.go6
5 files changed, 158 insertions, 93 deletions
diff --git a/static_pool.go b/static_pool.go
index f64a2c9a..2d23f518 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"os/exec"
"github.com/spiral/errors"
@@ -14,6 +13,20 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
+// Allocator is responsible for worker allocation in the pool
+type Allocator func() (WorkerBase, error)
+
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w WorkerBase) (Payload, error)
+
+// PoolBefore is set of functions that executes BEFORE Exec
+type Before func(req Payload) Payload
+
+// PoolAfter is set of functions that executes AFTER Exec
+type After func(req Payload, resp Payload) Payload
+
+type PoolOptions func(p *StaticPool)
+
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
cfg Config
@@ -25,14 +38,22 @@ type StaticPool struct {
factory Factory
// distributes the events
- events *util.EventHandler
+ events util.EventsHandler
// manages worker states and TTLs
- ww *workerWatcher
+ ww WorkerWatcher
+
+ // allocate new worker
+ allocator Allocator
+
+ errEncoder ErrorEncoder
+ before []Before
+ after []After
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config, options ...PoolOptions) (Pool, error) {
+ const op = errors.Op("NewPool")
cfg.InitDefaults()
if cfg.Debug {
@@ -44,21 +65,18 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
cfg: cfg,
cmd: cmd,
factory: factory,
- events: &util.EventHandler{},
+ events: util.NewEventsHandler(),
+ after: make([]After, 0, 0),
+ before: make([]Before, 0, 0),
}
- p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
- w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
- if err != nil {
- return nil, err
- }
+ var err error
+ p.allocator, err = newPoolAllocator(factory, cmd)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
- sw, err := NewSyncWorker(w)
- if err != nil {
- return nil, err
- }
- return sw, nil
- }, p.cfg.NumWorkers, p.events)
+ p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
if err != nil {
@@ -71,6 +89,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
return nil, err
}
+ p.errEncoder = defaultErrEncoder(p)
+
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
+
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
@@ -82,6 +107,18 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
return p, nil
}
+func PoolBefore(before ...Before) PoolOptions {
+ return func(p *StaticPool) {
+ p.before = append(p.before, before...)
+ }
+}
+
+func PoolAfter(after ...After) PoolOptions {
+ return func(p *StaticPool) {
+ p.after = append(p.after, after...)
+ }
+}
+
// AddListener connects event listener to the pool.
func (sp *StaticPool) AddListener(listener util.EventListener) {
sp.events.AddListener(listener)
@@ -107,86 +144,54 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
return sp.execDebug(p)
}
w, err := sp.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ if err != nil {
return EmptyPayload, errors.E(op, err)
- } else if err != nil {
- return EmptyPayload, err
}
sw := w.(SyncWorker)
- rsp, err := sw.Exec(p)
- if err != nil {
- // soft job errors are allowed
- if errors.Is(errors.Exec, err) {
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
- if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
- }
-
- w.State().Set(StateInvalid)
- err = w.Stop(bCtx)
- if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
- }
- } else {
- sp.ww.PushWorker(w)
- }
-
- return EmptyPayload, err
- }
-
- sw.State().Set(StateInvalid)
- sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
-
- if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ p = sp.before[i](p)
}
+ }
- return EmptyPayload, err
+ rsp, err := sw.Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, sw)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(StateInvalid)
- err = w.Stop(bCtx)
+ sw.State().Set(StateInvalid)
+ err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(p)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew(bCtx)
if err != nil {
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.PushWorker(sw)
}
- return rsp, nil
-}
-func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
- sw, err := sp.ww.allocator()
- if err != nil {
- return EmptyPayload, err
- }
-
- r, err := sw.(SyncWorker).Exec(p)
-
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](p, rsp)
+ }
}
- return r, err
+ return rsp, nil
}
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- const op = errors.Op("Exec")
+ const op = errors.Op("Exec with context")
w, err := sp.ww.GetFreeWorker(context.Background())
if err != nil {
return EmptyPayload, errors.E(op, err)
@@ -194,8 +199,54 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
sw := w.(SyncWorker)
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ rqs = sp.before[i](rqs)
+ }
+ }
+
rsp, err := sw.ExecWithContext(ctx, rqs)
if err != nil {
+ return sp.errEncoder(err, sw)
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ sw.State().Set(StateInvalid)
+ err = sw.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ }
+
+ return sp.Exec(rqs)
+ }
+
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(sw)
+ }
+
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](rqs, rsp)
+ }
+ }
+
+ return rsp, nil
+}
+
+// Destroy all underlying stack (but let them to complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w WorkerBase) (Payload, error) {
+ const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.Exec, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -216,7 +267,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
return EmptyPayload, errors.E(op, err)
}
- sw.State().Set(StateInvalid)
+ w.State().Set(StateInvalid)
sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
@@ -226,32 +277,36 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
return EmptyPayload, errors.E(op, err)
}
+}
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(StateInvalid)
- err = w.Stop(bCtx)
+func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) (Allocator, error) {
+ return func() (WorkerBase, error) {
+ w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ return nil, err
}
- return sp.Exec(rqs)
- }
-
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ sw, err := NewSyncWorker(w)
if err != nil {
- return EmptyPayload, errors.E(op, err)
+ return nil, err
}
- } else {
- sp.ww.PushWorker(w)
- }
- return rsp, nil
+ return sw, nil
+ }, nil
}
-// Destroy all underlying stack (but let them to complete the task).
-func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.ww.Destroy(ctx)
+func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
+ sw, err := sp.allocator()
+ if err != nil {
+ return EmptyPayload, err
+ }
+
+ r, err := sw.(SyncWorker).Exec(p)
+
+ if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
}
// allocate required number of stack
diff --git a/static_pool_test.go b/static_pool_test.go
index 8f8a6f56..d661c34d 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -157,7 +157,7 @@ func Test_StaticPool_JobError(t *testing.T) {
t.Fatal("error should be of type errors.Exec")
}
- assert.Contains(t, err.Error(), "exec payload: Exec: hello")
+ assert.Contains(t, err.Error(), "hello")
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
diff --git a/supervisor_pool.go b/supervisor_pool.go
index e23abdd1..43c36ae4 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -19,13 +19,13 @@ type SupervisedPool interface {
type supervisedPool struct {
cfg *SupervisorConfig
- events *util.EventHandler
+ events util.EventsHandler
pool Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool {
+func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig) SupervisedPool {
sp := &supervisedPool{
cfg: cfg,
events: events,
diff --git a/util/events.go b/util/events.go
index 9e12c4f7..21ebc29b 100755
--- a/util/events.go
+++ b/util/events.go
@@ -1,5 +1,11 @@
package util
+type EventsHandler interface {
+ NumListeners() int
+ AddListener(listener EventListener)
+ Push(e interface{})
+}
+
// Event listener listens for the events produced by worker, worker pool or other servce.
type EventListener func(event interface{})
@@ -8,6 +14,10 @@ type EventHandler struct {
listeners []EventListener
}
+func NewEventsHandler() EventsHandler {
+ return &EventHandler{listeners: make([]EventListener, 0, 2)}
+}
+
// NumListeners returns number of event listeners.
func (eb *EventHandler) NumListeners() int {
return len(eb.listeners)
diff --git a/worker_watcher.go b/worker_watcher.go
index 3a89554d..84be44f2 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -84,7 +84,7 @@ type WorkerWatcher interface {
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher {
+func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsHandler) WorkerWatcher {
ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
@@ -99,10 +99,10 @@ func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n
type workerWatcher struct {
mutex sync.RWMutex
stack *Stack
- allocator func(args ...interface{}) (WorkerBase, error)
+ allocator Allocator
initialNumWorkers int64
actualNumWorkers int64
- events *util.EventHandler
+ events util.EventsHandler
}
func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {