summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go47
1 files changed, 21 insertions, 26 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 837fd183..9cf79fd4 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -9,7 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsHandler "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
// Before is set of functions that executes BEFORE Exec
-type Before func(req internal.Payload) internal.Payload
+type Before func(req payload.Payload) payload.Payload
// After is set of functions that executes AFTER Exec
-type After func(req internal.Payload, resp internal.Payload) internal.Payload
+type After func(req payload.Payload, resp payload.Payload) payload.Payload
type Options func(p *StaticPool)
@@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
cfg: cfg,
cmd: cmd,
factory: factory,
- events: eventsHandler.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
after: make([]After, 0, 0),
before: make([]Before, 0, 0),
}
@@ -79,7 +80,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
p.allocator = newPoolAllocator(factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateSyncWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
-func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec with context")
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (internal.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.ErrSoftJob, err) {
@@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.ww.PushWorker(w)
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
w.State().Set(internal.StateInvalid)
@@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
errS := w.Stop(bCtx)
if errS != nil {
- return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
}
@@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc
}
}
-func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
r, err := sw.(worker.SyncWorker).Exec(p)
@@ -333,12 +334,11 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
- // TODO do not allocate context on every loop cycle??
for i := int64(0); i < numWorkers; i++ {
ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
@@ -346,12 +346,7 @@ func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64)
cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- sw, err := syncWorker.From(w)
- if err != nil {
- cancel()
- return nil, errors.E(op, err)
- }
- workers = append(workers, sw)
+ workers = append(workers, w)
cancel()
}
return workers, nil