summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go73
1 files changed, 32 insertions, 41 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d1b726c1..bb416b29 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,13 +6,11 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
type Options func(p *StaticPool)
@@ -34,7 +32,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory worker.Factory
+ factory transport.Factory
// distributes the events
events events.Handler
@@ -43,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww worker.Watcher
+ ww workerWatcher.Watcher
// allocate new worker
allocator worker.Allocator
@@ -53,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -69,7 +67,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
cfg: cfg,
cmd: cmd,
factory: factory,
- events: eventsPkg.NewEventsHandler(),
+ events: events.NewEventsHandler(),
}
// add pool options
@@ -78,7 +76,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
}
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
- p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
@@ -124,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} {
}
// Workers returns worker list associated with the pool.
-func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+func (sp *StaticPool) Workers() (workers []*worker.SyncWorkerImpl) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
return sp.ww.RemoveWorker(wb)
}
@@ -153,12 +151,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.Exec(p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -183,11 +181,11 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
// worker want's to be terminated
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- sp.stopWorker(&w)
+ sp.stopWorker(w)
return sp.ExecWithContext(ctx, p)
}
- err = sp.checkMaxJobs(&w)
+ err = sp.checkMaxJobs(w)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -195,30 +193,30 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
return rsp, nil
}
-func (sp *StaticPool) stopWorker(w *worker.SyncWorker) {
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- (*w).State().Set(internal.StateInvalid)
- err := (*w).Stop()
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error {
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.AllocateNew()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(*w)
+ sp.ww.PushWorker(w)
}
return nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (*worker.SyncWorkerImpl, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -230,7 +228,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w.(worker.SyncWorker), nil
+ return w, nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -239,7 +237,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
if errors.Is(errors.ExecTTL, err) {
@@ -277,8 +275,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
- return func() (worker.BaseProcess, error) {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
@@ -286,10 +284,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, err
- }
+ sw := worker.From(w)
sp.events.Push(events.PoolEvent{
Event: events.EventWorkerConstruct,
@@ -305,7 +300,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, err
}
- r, err := sw.(worker.SyncWorker).Exec(p)
+ r, err := sw.Exec(p)
if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
@@ -315,9 +310,9 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ var workers []worker.SyncWorker
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
@@ -326,11 +321,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, e
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- sw, err := syncWorker.From(w)
- if err != nil {
- return nil, errors.E(op, err)
- }
- workers = append(workers, sw)
+ workers = append(workers, w)
}
return workers, nil
}