diff options
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 230 |
1 files changed, 140 insertions, 90 deletions
diff --git a/static_pool.go b/static_pool.go index f64a2c9a..c1dacd8d 100755 --- a/static_pool.go +++ b/static_pool.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "os/exec" "github.com/spiral/errors" @@ -14,9 +13,23 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() +// Allocator is responsible for worker allocation in the pool +type Allocator func() (WorkerBase, error) + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w WorkerBase) (Payload, error) + +// PoolBefore is set of functions that executes BEFORE Exec +type Before func(req Payload) Payload + +// PoolAfter is set of functions that executes AFTER Exec +type After func(req Payload, resp Payload) Payload + +type PoolOptions func(p *StaticPool) + // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg PoolConfig // worker command creator cmd func() *exec.Cmd @@ -25,14 +38,22 @@ type StaticPool struct { factory Factory // distributes the events - events *util.EventHandler + events util.EventsHandler // manages worker states and TTLs - ww *workerWatcher + ww WorkerWatcher + + // allocate new worker + allocator Allocator + + errEncoder ErrorEncoder + before []Before + after []After } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg PoolConfig, options ...PoolOptions) (Pool, error) { + const op = errors.Op("NewPool") cfg.InitDefaults() if cfg.Debug { @@ -44,21 +65,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con cfg: cfg, cmd: cmd, factory: factory, - events: &util.EventHandler{}, + events: util.NewEventsHandler(), + after: make([]After, 0, 0), + before: make([]Before, 0, 0), } - p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { - w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) - if err != nil { - return nil, err - } - - sw, err := NewSyncWorker(w) - if err != nil { - return nil, err - } - return sw, nil - }, p.cfg.NumWorkers, p.events) + p.allocator = newPoolAllocator(factory, cmd) + p.ww = newWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { @@ -71,6 +84,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return nil, err } + p.errEncoder = defaultErrEncoder(p) + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) @@ -82,13 +102,25 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con return p, nil } +func PoolBefore(before ...Before) PoolOptions { + return func(p *StaticPool) { + p.before = append(p.before, before...) + } +} + +func PoolAfter(after ...After) PoolOptions { + return func(p *StaticPool) { + p.after = append(p.after, after...) + } +} + // AddListener connects event listener to the pool. func (sp *StaticPool) AddListener(listener util.EventListener) { sp.events.AddListener(listener) } -// Config returns associated pool configuration. Immutable. -func (sp *StaticPool) GetConfig() Config { +// PoolConfig returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() PoolConfig { return sp.cfg } @@ -107,86 +139,54 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { return sp.execDebug(p) } w, err := sp.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + if err != nil { return EmptyPayload, errors.E(op, err) - } else if err != nil { - return EmptyPayload, err } sw := w.(SyncWorker) - rsp, err := sw.Exec(p) - if err != nil { - // soft job errors are allowed - if errors.Is(errors.Exec, err) { - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) - if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - } - - w.State().Set(StateInvalid) - err = w.Stop(bCtx) - if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) - } - } else { - sp.ww.PushWorker(w) - } - - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) - errS := w.Stop(bCtx) - - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + p = sp.before[i](p) } + } - return EmptyPayload, err + rsp, err := sw.Exec(p) + if err != nil { + return sp.errEncoder(err, sw) } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(bCtx) + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) } - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew(bCtx) if err != nil { - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } } else { - sp.ww.PushWorker(w) + sp.ww.PushWorker(sw) } - return rsp, nil -} -func (sp *StaticPool) execDebug(p Payload) (Payload, error) { - sw, err := sp.ww.allocator() - if err != nil { - return EmptyPayload, err - } - - r, err := sw.(SyncWorker).Exec(p) - - if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](p, rsp) + } } - return r, err + return rsp, nil } func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - const op = errors.Op("Exec") + const op = errors.Op("Exec with context") w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil { return EmptyPayload, errors.E(op, err) @@ -194,8 +194,54 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload sw := w.(SyncWorker) + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + rqs = sp.before[i](rqs) + } + } + rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { + return sp.errEncoder(err, sw) + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + sw.State().Set(StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + } + + return sp.Exec(rqs) + } + + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) + } + + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](rqs, rsp) + } + } + + return rsp, nil +} + +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w WorkerBase) (Payload, error) { + const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -216,7 +262,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return EmptyPayload, errors.E(op, err) } - sw.State().Set(StateInvalid) + w.State().Set(StateInvalid) sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) @@ -226,32 +272,36 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return EmptyPayload, errors.E(op, err) } +} - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(bCtx) +func newPoolAllocator(factory Factory, cmd func() *exec.Cmd) Allocator { + return func() (WorkerBase, error) { + w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + return nil, err } - return sp.Exec(rqs) - } - - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + sw, err := NewSyncWorker(w) if err != nil { - return EmptyPayload, errors.E(op, err) + return nil, err } - } else { - sp.ww.PushWorker(w) + return sw, nil } - return rsp, nil } -// Destroy all underlying stack (but let them to complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.ww.Destroy(ctx) +func (sp *StaticPool) execDebug(p Payload) (Payload, error) { + sw, err := sp.allocator() + if err != nil { + return EmptyPayload, err + } + + r, err := sw.(SyncWorker).Exec(p) + + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err } // allocate required number of stack |