diff options
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 211 |
1 files changed, 117 insertions, 94 deletions
diff --git a/static_pool.go b/static_pool.go index 0c2352ad..4ecbdd41 100755 --- a/static_pool.go +++ b/static_pool.go @@ -6,20 +6,19 @@ import ( "os/exec" "sync" + "github.com/spiral/roadrunner/v2/util" + "github.com/pkg/errors" ) -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - // pool behaviour - cfg *Config + cfg Config // worker command creator cmd func() *exec.Cmd @@ -27,30 +26,31 @@ type StaticPool struct { // creates and connects to stack factory Factory + // distributes the events + events *util.EventHandler + // protects state of worker list, does not affect allocation muw sync.RWMutex - ww *WorkersWatcher + // manages worker states and TTLs + ww *workerWatcher - events chan PoolEvent -} -type PoolEvent struct { - Payload interface{} + // supervises memory and TTL of workers + // sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -// TODO why cfg is passed by pointer? -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() p := &StaticPool{ cfg: cfg, cmd: cmd, factory: factory, - events: make(chan PoolEvent), + events: &util.EventHandler{}, } - p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { + p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { return nil, err @@ -74,12 +74,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co return nil, err } + // todo: implement + // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) + // p.sp.Start() + return p, nil } +// AddListener connects event listener to the pool. +func (p *StaticPool) AddListener(listener util.EventListener) { + p.events.AddListener(listener) +} + // Config returns associated pool configuration. Immutable. -func (p *StaticPool) Config() Config { - return *p.cfg +func (p *StaticPool) GetConfig() Config { + return p.cfg } // Workers returns worker list associated with the pool. @@ -103,18 +112,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { rsp, err := sw.Exec(rqs) if err != nil { - errJ := p.checkMaxJobs(bCtx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) + if _, jobError := err.(JobError); jobError { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(bCtx) + if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + p.ww.PushWorker(w) + } + return EmptyPayload, err } sw.State().Set(StateInvalid) + p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) + if errS != nil { return EmptyPayload, fmt.Errorf("%v, %v", err, errS) } @@ -127,9 +148,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - return EmptyPayload, err + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.ExecWithContext(bCtx, rqs) + + return p.Exec(rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { @@ -146,81 +168,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { } // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - // todo: why TODO passed here? - getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) - defer cancel() - w, err := p.ww.GetFreeWorker(getWorkerCtx) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped - } else if err != nil { - return EmptyPayload, err - } - - sw := w.(SyncWorker) - - var execCtx context.Context - if p.cfg.ExecTTL != 0 { - var cancel2 context.CancelFunc - execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() - } else { - execCtx = ctx - } - - rsp, err := sw.ExecWithContext(execCtx, rqs) - if err != nil { - errJ := p.checkMaxJobs(ctx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } - // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - errS := w.Stop(ctx) - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) - } - - return EmptyPayload, err - } - - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(ctx) - if err != nil { - return EmptyPayload, err - } - return p.ExecWithContext(ctx, rqs) - } - - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(ctx) - if err != nil { - return EmptyPayload, err - } - } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() - } - return rsp, nil -} +// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +// // todo: why TODO passed here? +// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) +// defer cancel() +// w, err := p.ww.GetFreeWorker(getWorkerCtx) +// if err != nil && errors.Is(err, ErrWatcherStopped) { +// return EmptyPayload, ErrWatcherStopped +// } else if err != nil { +// return EmptyPayload, err +// } +// +// sw := w.(SyncWorker) +// +// // todo: implement worker destroy +// //execCtx context.Context +// //if p.cfg.Supervisor.ExecTTL != 0 { +// // var cancel2 context.CancelFunc +// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) +// // defer cancel2() +// //} else { +// // execCtx = ctx +// //} +// +// rsp, err := sw.Exec(rqs) +// if err != nil { +// errJ := p.checkMaxJobs(ctx, w) +// if errJ != nil { +// // todo: worker was not destroyed +// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) +// } +// +// // soft job errors are allowed +// if _, jobError := err.(JobError); jobError { +// p.ww.PushWorker(w) +// return EmptyPayload, err +// } +// +// sw.State().Set(StateInvalid) +// errS := w.Stop(ctx) +// if errS != nil { +// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) +// } +// +// return EmptyPayload, err +// } +// +// // worker want's to be terminated +// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { +// w.State().Set(StateInvalid) +// err = w.Stop(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// return p.ExecWithContext(ctx, rqs) +// } +// +// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { +// err = p.ww.AllocateNew(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// } else { +// p.muw.Lock() +// p.ww.PushWorker(w) +// p.muw.Unlock() +// } +// +// return rsp, nil +// } // Destroy all underlying stack (but let them to complete the task). func (p *StaticPool) Destroy(ctx context.Context) { p.ww.Destroy(ctx) } -func (p *StaticPool) Events() chan PoolEvent { - return p.events -} - // allocate required number of stack func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase @@ -243,6 +265,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { err := p.ww.AllocateNew(ctx) if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } |