summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rwxr-xr-xstatic_pool.go211
1 files changed, 117 insertions, 94 deletions
diff --git a/static_pool.go b/static_pool.go
index 0c2352ad..4ecbdd41 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,20 +6,19 @@ import (
"os/exec"
"sync"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
)
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- // pool behaviour
- cfg *Config
+ cfg Config
// worker command creator
cmd func() *exec.Cmd
@@ -27,30 +26,31 @@ type StaticPool struct {
// creates and connects to stack
factory Factory
+ // distributes the events
+ events *util.EventHandler
+
// protects state of worker list, does not affect allocation
muw sync.RWMutex
- ww *WorkersWatcher
+ // manages worker states and TTLs
+ ww *workerWatcher
- events chan PoolEvent
-}
-type PoolEvent struct {
- Payload interface{}
+ // supervises memory and TTL of workers
+ // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-// TODO why cfg is passed by pointer?
-func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
cfg.InitDefaults()
p := &StaticPool{
cfg: cfg,
cmd: cmd,
factory: factory,
- events: make(chan PoolEvent),
+ events: &util.EventHandler{},
}
- p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
+ p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
if err != nil {
return nil, err
@@ -74,12 +74,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co
return nil, err
}
+ // todo: implement
+ // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // p.sp.Start()
+
return p, nil
}
+// AddListener connects event listener to the pool.
+func (p *StaticPool) AddListener(listener util.EventListener) {
+ p.events.AddListener(listener)
+}
+
// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) Config() Config {
- return *p.cfg
+func (p *StaticPool) GetConfig() Config {
+ return p.cfg
}
// Workers returns worker list associated with the pool.
@@ -103,18 +112,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
rsp, err := sw.Exec(rqs)
if err != nil {
- errJ := p.checkMaxJobs(bCtx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
// soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
+ if _, jobError := err.(JobError); jobError {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(bCtx)
+ if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ p.ww.PushWorker(w)
+ }
+
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
+ p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
+
if errS != nil {
return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
}
@@ -127,9 +148,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- return EmptyPayload, err
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.ExecWithContext(bCtx, rqs)
+
+ return p.Exec(rqs)
}
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
@@ -146,81 +168,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
}
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- // todo: why TODO passed here?
- getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
- defer cancel()
- w, err := p.ww.GetFreeWorker(getWorkerCtx)
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
- } else if err != nil {
- return EmptyPayload, err
- }
-
- sw := w.(SyncWorker)
-
- var execCtx context.Context
- if p.cfg.ExecTTL != 0 {
- var cancel2 context.CancelFunc
- execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
- defer cancel2()
- } else {
- execCtx = ctx
- }
-
- rsp, err := sw.ExecWithContext(execCtx, rqs)
- if err != nil {
- errJ := p.checkMaxJobs(ctx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
- // soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
- return EmptyPayload, err
- }
-
- sw.State().Set(StateInvalid)
- errS := w.Stop(ctx)
- if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
- }
-
- return EmptyPayload, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(StateInvalid)
- err = w.Stop(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- return p.ExecWithContext(ctx, rqs)
- }
-
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- } else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
- }
- return rsp, nil
-}
+// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+// // todo: why TODO passed here?
+// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
+// defer cancel()
+// w, err := p.ww.GetFreeWorker(getWorkerCtx)
+// if err != nil && errors.Is(err, ErrWatcherStopped) {
+// return EmptyPayload, ErrWatcherStopped
+// } else if err != nil {
+// return EmptyPayload, err
+// }
+//
+// sw := w.(SyncWorker)
+//
+// // todo: implement worker destroy
+// //execCtx context.Context
+// //if p.cfg.Supervisor.ExecTTL != 0 {
+// // var cancel2 context.CancelFunc
+// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
+// // defer cancel2()
+// //} else {
+// // execCtx = ctx
+// //}
+//
+// rsp, err := sw.Exec(rqs)
+// if err != nil {
+// errJ := p.checkMaxJobs(ctx, w)
+// if errJ != nil {
+// // todo: worker was not destroyed
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+// }
+//
+// // soft job errors are allowed
+// if _, jobError := err.(JobError); jobError {
+// p.ww.PushWorker(w)
+// return EmptyPayload, err
+// }
+//
+// sw.State().Set(StateInvalid)
+// errS := w.Stop(ctx)
+// if errS != nil {
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+// }
+//
+// return EmptyPayload, err
+// }
+//
+// // worker want's to be terminated
+// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+// w.State().Set(StateInvalid)
+// err = w.Stop(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// return p.ExecWithContext(ctx, rqs)
+// }
+//
+// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+// err = p.ww.AllocateNew(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// } else {
+// p.muw.Lock()
+// p.ww.PushWorker(w)
+// p.muw.Unlock()
+// }
+//
+// return rsp, nil
+// }
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {
p.ww.Destroy(ctx)
}
-func (p *StaticPool) Events() chan PoolEvent {
- return p.events
-}
-
// allocate required number of stack
func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
var workers []WorkerBase
@@ -243,6 +265,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
err := p.ww.AllocateNew(ctx)
if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
return err
}
}