summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-21 10:27:10 +0300
committerValery Piashchynski <[email protected]>2020-12-21 10:27:10 +0300
commit2f71f79ac704ed95dad961677b6e602e38641b5d (patch)
tree7452bbedd1444079757a848ad07089bc6093561f /pkg/pool
parent3d4c75aadd9ffd0d46728f48f685de2e1bfc44bb (diff)
Remove unused contex from interfaces. Update pool allocator.
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/static_pool.go49
1 files changed, 22 insertions, 27 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e5a5a7e8..2a06b255 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -3,6 +3,7 @@ package pool
import (
"context"
"os/exec"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
@@ -18,8 +19,6 @@ import (
// StopRequest can be sent by worker to indicate that restart is required.
const StopRequest = "{\"stop\":true}"
-var bCtx = context.Background()
-
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
@@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
before: make([]Before, 0, 0),
}
- p.allocator = newPoolAllocator(factory, cmd)
+ p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ err = sw.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
@@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- sw := w.(worker.SyncWorker)
-
// apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
- rsp, err := sw.ExecWithContext(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, rqs)
if err != nil {
- return sp.errEncoder(err, sw)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
}
- if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(sw)
+ sp.ww.PushWorker(w)
}
// apply all after functions
@@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w, nil
+ return w.(worker.SyncWorker), nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- err = w.Stop(bCtx)
+ err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
@@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(internal.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
+ errS := w.Stop()
if errS != nil {
return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
@@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.BaseProcess, error) {
- w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
if err != nil {
return nil, err
}
@@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
@@ -334,26 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
- w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ w, err := sp.allocator()
if err != nil {
- cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
sw, err := syncWorker.From(w)
if err != nil {
- cancel()
return nil, errors.E(op, err)
}
workers = append(workers, sw)
- cancel()
}
return workers, nil
}