diff options
author | Valery Piashchynski <[email protected]> | 2020-11-10 14:45:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-10 14:45:59 +0300 |
commit | 7eb675a031d751787b31bd6894c936e86b190ebf (patch) | |
tree | 4fd570a325fccfc1e7ed3fa51b3e78c2f4172076 | |
parent | 49225cd9b0796ba381a767dfebd5b3c1dbbac69e (diff) |
Pool options, allocator
-rwxr-xr-x | static_pool.go | 229 | ||||
-rwxr-xr-x | static_pool_test.go | 2 | ||||
-rwxr-xr-x | supervisor_pool.go | 4 | ||||
-rwxr-xr-x | util/events.go | 10 | ||||
-rwxr-xr-x | worker_watcher.go | 6 |
5 files changed, 158 insertions, 93 deletions
diff --git a/static_pool.go b/static_pool.go index f64a2c9a..2d23f518 100755 --- a/static_pool.go +++ b/static_pool.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "os/exec" "github.com/spiral/errors" @@ -14,6 +13,20 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() +// Allocator is responsible for worker allocation in the pool +type Allocator func() (WorkerBase, error) + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w WorkerBase) (Payload, error) + +// PoolBefore is set of functions that executes BEFORE Exec +type Before func(req Payload) Payload + +// PoolAfter is set of functions that executes AFTER Exec +type After func(req Payload, resp Payload) Payload + +type PoolOptions func(p *StaticPool) + // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { cfg Config @@ -25,14 +38,22 @@ type StaticPool struct { factory Factory // distributes the events - events *util.EventHandler + events util.EventsHandler // manages worker states and TTLs - ww *workerWatcher + ww WorkerWatcher + + // allocate new worker + allocator Allocator + + errEncoder ErrorEncoder + before []Before + after []After } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config, options ...PoolOptions) (Pool, error) { + const op = errors.Op("NewPool") cfg.InitDefaults() if cfg.Debug { @@ -44,21 +65,18 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con cfg: cfg, cmd: cmd, factory: factory, - events: &util.EventHandler{}, + events: util.NewEventsHandler(), + after: make([]After, 0, 0), + before: make([]Before, 0, 0), } - p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { - w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) - if err != nil { - return nil, err - } + var err error + p.allocator, err = newPoolAllocator(factory, cmd) + if err != nil { + return nil, errors.E(op, err) + } - sw, err := NewSyncWorker(w) - if err != nil { - return nil, err - } - return sw, nil - }, p.cfg.NumWorkers, p.events) + p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { @@ -71,6 +89,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return nil, err } + p.errEncoder = defaultErrEncoder(p) + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) @@ -82,6 +107,18 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return p, nil } +func PoolBefore(before ...Before) PoolOptions { + return func(p *StaticPool) { + p.before = append(p.before, before...) + } +} + +func PoolAfter(after ...After) PoolOptions { + return func(p *StaticPool) { + p.after = append(p.after, after...) + } +} + // AddListener connects event listener to the pool. func (sp *StaticPool) AddListener(listener util.EventListener) { sp.events.AddListener(listener) @@ -107,86 +144,54 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { return sp.execDebug(p) } w, err := sp.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + if err != nil { return EmptyPayload, errors.E(op, err) - } else if err != nil { - return EmptyPayload, err } sw := w.(SyncWorker) - rsp, err := sw.Exec(p) - if err != nil { - // soft job errors are allowed - if errors.Is(errors.Exec, err) { - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) - if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - } - - w.State().Set(StateInvalid) - err = w.Stop(bCtx) - if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) - } - } else { - sp.ww.PushWorker(w) - } - - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) - errS := w.Stop(bCtx) - - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + p = sp.before[i](p) } + } - return EmptyPayload, err + rsp, err := sw.Exec(p) + if err != nil { + return sp.errEncoder(err, sw) } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(bCtx) + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) } - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew(bCtx) if err != nil { - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } } else { - sp.ww.PushWorker(w) + sp.ww.PushWorker(sw) } - return rsp, nil -} -func (sp *StaticPool) execDebug(p Payload) (Payload, error) { - sw, err := sp.ww.allocator() - if err != nil { - return EmptyPayload, err - } - - r, err := sw.(SyncWorker).Exec(p) - - if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](p, rsp) + } } - return r, err + return rsp, nil } func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - const op = errors.Op("Exec") + const op = errors.Op("Exec with context") w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil { return EmptyPayload, errors.E(op, err) @@ -194,8 +199,54 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload sw := w.(SyncWorker) + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + rqs = sp.before[i](rqs) + } + } + rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { + return sp.errEncoder(err, sw) + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + } + + return sp.Exec(rqs) + } + + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) + } + + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](rqs, rsp) + } + } + + return rsp, nil +} + +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w WorkerBase) (Payload, error) { + const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -216,7 +267,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return EmptyPayload, errors.E(op, err) } - sw.State().Set(StateInvalid) + w.State().Set(StateInvalid) sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) @@ -226,32 +277,36 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return EmptyPayload, errors.E(op, err) } +} - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(bCtx) +func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) (Allocator, error) { + return func() (WorkerBase, error) { + w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + return nil, err } - return sp.Exec(rqs) - } - - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + sw, err := NewSyncWorker(w) if err != nil { - return EmptyPayload, errors.E(op, err) + return nil, err } - } else { - sp.ww.PushWorker(w) - } - return rsp, nil + return sw, nil + }, nil } -// Destroy all underlying stack (but let them to complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.ww.Destroy(ctx) +func (sp *StaticPool) execDebug(p Payload) (Payload, error) { + sw, err := sp.allocator() + if err != nil { + return EmptyPayload, err + } + + r, err := sw.(SyncWorker).Exec(p) + + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err } // allocate required number of stack diff --git a/static_pool_test.go b/static_pool_test.go index 8f8a6f56..d661c34d 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -157,7 +157,7 @@ func Test_StaticPool_JobError(t *testing.T) { t.Fatal("error should be of type errors.Exec") } - assert.Contains(t, err.Error(), "exec payload: Exec: hello") + assert.Contains(t, err.Error(), "hello") } func Test_StaticPool_Broken_Replace(t *testing.T) { diff --git a/supervisor_pool.go b/supervisor_pool.go index e23abdd1..43c36ae4 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -19,13 +19,13 @@ type SupervisedPool interface { type supervisedPool struct { cfg *SupervisorConfig - events *util.EventHandler + events util.EventsHandler pool Pool stopCh chan struct{} mu *sync.RWMutex } -func newPoolWatcher(pool Pool, events *util.EventHandler, cfg *SupervisorConfig) SupervisedPool { +func newPoolWatcher(pool Pool, events util.EventsHandler, cfg *SupervisorConfig) SupervisedPool { sp := &supervisedPool{ cfg: cfg, events: events, diff --git a/util/events.go b/util/events.go index 9e12c4f7..21ebc29b 100755 --- a/util/events.go +++ b/util/events.go @@ -1,5 +1,11 @@ package util +type EventsHandler interface { + NumListeners() int + AddListener(listener EventListener) + Push(e interface{}) +} + // Event listener listens for the events produced by worker, worker pool or other servce. type EventListener func(event interface{}) @@ -8,6 +14,10 @@ type EventHandler struct { listeners []EventListener } +func NewEventsHandler() EventsHandler { + return &EventHandler{listeners: make([]EventListener, 0, 2)} +} + // NumListeners returns number of event listeners. func (eb *EventHandler) NumListeners() int { return len(eb.listeners) diff --git a/worker_watcher.go b/worker_watcher.go index 3a89554d..84be44f2 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -84,7 +84,7 @@ type WorkerWatcher interface { } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher { +func newWorkerWatcher(allocator Allocator, numWorkers int64, events util.EventsHandler) WorkerWatcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -99,10 +99,10 @@ func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n type workerWatcher struct { mutex sync.RWMutex stack *Stack - allocator func(args ...interface{}) (WorkerBase, error) + allocator Allocator initialNumWorkers int64 actualNumWorkers int64 - events *util.EventHandler + events util.EventsHandler } func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { |