summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-04 20:37:48 +0300
committerValery Piashchynski <[email protected]>2021-02-04 20:37:48 +0300
commitd629f08408a4478aaba90079a4e37ab69cfc12ef (patch)
tree2cb67bc5c9be295428239369e9d211f3888308fe /pkg/pool/static_pool.go
parentefacb852e279e6bbfc076c0faff391ff39815718 (diff)
pre-rc stabilization of the interfaces and internal code
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go25
1 files changed, 12 insertions, 13 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 01b0574d..72c3d4df 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -84,7 +83,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
}
// put stack in the pool
- err = p.ww.AddToWatch(workers)
+ err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -123,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} {
// Workers returns worker list associated with the pool.
func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
- return sp.ww.WorkersList()
+ return sp.ww.List()
}
func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
- return sp.ww.RemoveWorker(wb)
+ return sp.ww.Remove(wb)
}
// Be careful, sync Exec with ExecWithContext
@@ -195,7 +194,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -206,19 +205,19 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew()
+ err := sp.ww.Allocate()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return nil
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
- // GetFreeWorker function consumes context with timeout
- w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ // Get function consumes context with timeout
+ w, err := sp.ww.Get(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -246,24 +245,24 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// soft job errors are allowed
if errors.Is(errors.SoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()