summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 15:16:55 +0300
committerValery Piashchynski <[email protected]>2020-10-27 15:16:55 +0300
commitd199ef71b9644afbbba064c317cd0991be1c2443 (patch)
treef777eb90f10ca0e7dbc46227fc76c61f02111946 /static_pool.go
parent91cf918b30938129609323ded53e190385e019a6 (diff)
Supervised pool
Diffstat (limited to 'static_pool.go')
-rwxr-xr-xstatic_pool.go158
1 files changed, 78 insertions, 80 deletions
diff --git a/static_pool.go b/static_pool.go
index 4ecbdd41..3af933c3 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,9 +6,8 @@ import (
"os/exec"
"sync"
+ "github.com/spiral/roadrunner/v2/errors"
"github.com/spiral/roadrunner/v2/util"
-
- "github.com/pkg/errors"
)
// StopRequest can be sent by worker to indicate that restart is required.
@@ -34,9 +33,6 @@ type StaticPool struct {
// manages worker states and TTLs
ww *workerWatcher
-
- // supervises memory and TTL of workers
- // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -74,9 +70,13 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
return nil, err
}
- // todo: implement
- // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
- // p.sp.Start()
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
return p, nil
}
@@ -101,9 +101,10 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
}
func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
+ const op = errors.Op("Exec")
w, err := p.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
} else if err != nil {
return EmptyPayload, err
}
@@ -167,76 +168,73 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-// Exec one task with given payload and context, returns result or error.
-// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
-// // todo: why TODO passed here?
-// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
-// defer cancel()
-// w, err := p.ww.GetFreeWorker(getWorkerCtx)
-// if err != nil && errors.Is(err, ErrWatcherStopped) {
-// return EmptyPayload, ErrWatcherStopped
-// } else if err != nil {
-// return EmptyPayload, err
-// }
-//
-// sw := w.(SyncWorker)
-//
-// // todo: implement worker destroy
-// //execCtx context.Context
-// //if p.cfg.Supervisor.ExecTTL != 0 {
-// // var cancel2 context.CancelFunc
-// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
-// // defer cancel2()
-// //} else {
-// // execCtx = ctx
-// //}
-//
-// rsp, err := sw.Exec(rqs)
-// if err != nil {
-// errJ := p.checkMaxJobs(ctx, w)
-// if errJ != nil {
-// // todo: worker was not destroyed
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
-// }
-//
-// // soft job errors are allowed
-// if _, jobError := err.(JobError); jobError {
-// p.ww.PushWorker(w)
-// return EmptyPayload, err
-// }
-//
-// sw.State().Set(StateInvalid)
-// errS := w.Stop(ctx)
-// if errS != nil {
-// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
-// }
-//
-// return EmptyPayload, err
-// }
-//
-// // worker want's to be terminated
-// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
-// w.State().Set(StateInvalid)
-// err = w.Stop(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// return p.ExecWithContext(ctx, rqs)
-// }
-//
-// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
-// err = p.ww.AllocateNew(ctx)
-// if err != nil {
-// return EmptyPayload, err
-// }
-// } else {
-// p.muw.Lock()
-// p.ww.PushWorker(w)
-// p.muw.Unlock()
-// }
-//
-// return rsp, nil
-// }
+func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ const op = errors.Op("Exec")
+ w, err := p.ww.GetFreeWorker(context.Background())
+ if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ return EmptyPayload, errors.E(op, err)
+ } else if err != nil {
+ return EmptyPayload, err
+ }
+
+ sw := w.(SyncWorker)
+
+ rsp, err := sw.ExecWithContext(ctx, rqs)
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(JobError); jobError {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(bCtx)
+ if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ p.ww.PushWorker(w)
+ }
+
+ return EmptyPayload, err
+ }
+
+ sw.State().Set(StateInvalid)
+ p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ errS := w.Stop(bCtx)
+
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+
+ return p.Exec(rqs)
+ }
+
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err = p.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ p.muw.Lock()
+ p.ww.PushWorker(w)
+ p.muw.Unlock()
+ }
+ return rsp, nil
+}
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {