diff options
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-x | pkg/pool/static_pool.go | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 220ea8e9..691290b2 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -5,6 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -40,7 +41,7 @@ type StaticPool struct { factory worker.Factory // distributes the events - events worker.EventsHandler + events events.Handler // manages worker states and TTLs ww worker.Watcher @@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options { } // AddListener connects event listener to the pool. -func (sp *StaticPool) AddListener(listener worker.EventListener) { +func (sp *StaticPool) AddListener(listener events.EventListener) { sp.events.AddListener(listener) } @@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) @@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) return nil, errors.E(op, err) } // else if err not nil - return error @@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } w.State().Set(internal.StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) @@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } w.State().Set(internal.StateInvalid) - sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { r, err := sw.(worker.SyncWorker).Exec(p) if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } return r, err |