summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-21 14:24:45 +0300
committerGitHub <[email protected]>2020-12-21 14:24:45 +0300
commit8543980775e5f8b12e5e200a0764052cdb4350a5 (patch)
treec1c6dff8e6bd81bcf51d608c5ed935702911ae81 /pkg/pool/static_pool.go
parentfd6e9cc403fc0c3857dcf29768429a374bd85636 (diff)
parent7b32b6b93576ec72b4b7fdf2068e655f869e9cf8 (diff)
Merge pull request #453 from spiral/plugin/redis
Plugin/redis
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go55
1 files changed, 28 insertions, 27 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 9cf79fd4..2a06b255 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -3,6 +3,7 @@ package pool
import (
"context"
"os/exec"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
@@ -18,8 +19,6 @@ import (
// StopRequest can be sent by worker to indicate that restart is required.
const StopRequest = "{\"stop\":true}"
-var bCtx = context.Background()
-
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
@@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
before: make([]Before, 0, 0),
}
- p.allocator = newPoolAllocator(factory, cmd)
+ p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ err = sw.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
@@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- sw := w.(worker.SyncWorker)
-
// apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
- rsp, err := sw.ExecWithContext(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, rqs)
if err != nil {
- return sp.errEncoder(err, sw)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
}
- if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(sw)
+ sp.ww.PushWorker(w)
}
// apply all after functions
@@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w, nil
+ return w.(worker.SyncWorker), nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- err = w.Stop(bCtx)
+ err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
@@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(internal.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
+ errS := w.Stop()
if errS != nil {
return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
@@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.BaseProcess, error) {
- w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
if err != nil {
return nil, err
}
@@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
@@ -334,20 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
- w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ w, err := sp.allocator()
if err != nil {
- cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- workers = append(workers, w)
- cancel()
+
+ sw, err := syncWorker.From(w)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ workers = append(workers, sw)
}
return workers, nil
}