diff options
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-x | pkg/pool/static_pool.go | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 6cc42143..9cf79fd4 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -9,7 +9,8 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - events2 "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) // Before is set of functions that executes BEFORE Exec -type Before func(req internal.Payload) internal.Payload +type Before func(req payload.Payload) payload.Payload // After is set of functions that executes AFTER Exec -type After func(req internal.Payload, resp internal.Payload) internal.Payload +type After func(req payload.Payload, resp payload.Payload) payload.Payload type Options func(p *StaticPool) @@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg: cfg, cmd: cmd, factory: factory, - events: events2.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), after: make([]After, 0, 0), before: make([]Before, 0, 0), } @@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return sp.ww.RemoveWorker(wb) } -func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { return rsp, nil } -func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("exec with context") ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (internal.Payload, error) { + return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.ErrSoftJob, err) { @@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.ww.PushWorker(w) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } w.State().Set(internal.StateInvalid) @@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { errS := w.Stop(bCtx) if errS != nil { - return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } @@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc } } -func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } r, err := sw.(worker.SyncWorker).Exec(p) |