summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go19
1 files changed, 10 insertions, 9 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 220ea8e9..691290b2 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -5,6 +5,7 @@ import (
"os/exec"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -40,7 +41,7 @@ type StaticPool struct {
factory worker.Factory
// distributes the events
- events worker.EventsHandler
+ events events.Handler
// manages worker states and TTLs
ww worker.Watcher
@@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options {
}
// AddListener connects event listener to the pool.
-func (sp *StaticPool) AddListener(listener worker.EventListener) {
+func (sp *StaticPool) AddListener(listener events.EventListener) {
sp.events.AddListener(listener)
}
@@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(p)
@@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
sw.State().Set(internal.StateInvalid)
err = sw.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
w.State().Set(internal.StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
@@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w})
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
if stopErr := sw.Stop(context.Background()); stopErr != nil {
- sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
return r, err