diff options
author | Valery Piashchynski <[email protected]> | 2020-12-17 03:16:55 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-17 03:16:55 +0300 |
commit | 40cfd9f6b44dfe987bfbf010bf67b32abdc64208 (patch) | |
tree | 10e3c3cd0805619ac30533078eb7d2585877a1b3 /pkg | |
parent | 9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (diff) |
Now better
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 19 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 11 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 19 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 3 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 9 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 18 |
7 files changed, 43 insertions, 40 deletions
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 8250d226..99212ff8 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -413,7 +413,7 @@ func Test_Broken(t *testing.T) { data := "" mu := &sync.Mutex{} w.AddListener(func(event interface{}) { - if wev, ok := event.(worker.Event); ok { + if wev, ok := event.(events.WorkerEvent); ok { mu.Lock() data = string(wev.Payload.([]byte)) mu.Unlock() diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 220ea8e9..691290b2 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -5,6 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -40,7 +41,7 @@ type StaticPool struct { factory worker.Factory // distributes the events - events worker.EventsHandler + events events.Handler // manages worker states and TTLs ww worker.Watcher @@ -120,7 +121,7 @@ func ExecAfter(after ...After) Options { } // AddListener connects event listener to the pool. -func (sp *StaticPool) AddListener(listener worker.EventListener) { +func (sp *StaticPool) AddListener(listener events.EventListener) { sp.events.AddListener(listener) } @@ -169,7 +170,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(p) @@ -221,7 +222,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) sw.State().Set(internal.StateInvalid) err = sw.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -252,7 +253,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) return nil, errors.E(op, err) } // else if err not nil - return error @@ -274,13 +275,13 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } w.State().Set(internal.StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) @@ -290,7 +291,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } w.State().Set(internal.StateInvalid) - sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w}) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -325,7 +326,7 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { r, err := sw.(worker.SyncWorker).Exec(p) if stopErr := sw.Stop(context.Background()); stopErr != nil { - sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } return r, err diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 8b13c7c9..0794b8e6 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,8 +12,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" @@ -179,8 +178,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { block := make(chan struct{}) p.AddListener(func(event interface{}) { - if wev, ok := event.(worker.Event); ok { - if wev.Event == worker.EventWorkerLog { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerLog { e := string(wev.Payload.([]byte)) if strings.ContainsAny(e, "undefined_function()") { block <- struct{}{} @@ -227,8 +226,8 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) p.AddListener(func(event interface{}) { - if pe, ok := event.(pool.Event); ok { - if pe.Event == pool.EventWorkerConstruct { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { wg.Done() } } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 0a2d16f7..6d1f0c58 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -7,6 +7,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -22,13 +23,13 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig - events worker.EventsHandler + events events.Handler pool pool.Pool stopCh chan struct{} mu *sync.RWMutex } -func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised { +func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, events: events, @@ -91,7 +92,7 @@ func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { return rsp, nil } -func (sp *supervised) AddListener(listener worker.EventListener) { +func (sp *supervised) AddListener(listener events.EventListener) { sp.pool.AddListener(listener) } @@ -156,20 +157,20 @@ func (sp *supervised) control() { if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } @@ -197,10 +198,10 @@ func (sp *supervised) control() { if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { - sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) return } - sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]}) + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 0fcde2c3..1eb1396e 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,6 +6,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "go.uber.org/multierr" @@ -193,7 +194,7 @@ func (tw *syncWorker) Created() time.Time { return tw.w.Created() } -func (tw *syncWorker) AddListener(listener worker.EventListener) { +func (tw *syncWorker) AddListener(listener events.EventListener) { tw.w.AddListener(listener) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index d2b4374b..998ed592 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,6 +13,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/util" @@ -43,7 +44,7 @@ type Process struct { created time.Time // updates parent supervisor or pool about Process events - events worker.EventsHandler + events events.Handler // state holds information about current Process state, // number of Process executions, buf status change time. @@ -119,7 +120,7 @@ func (w *Process) Created() time.Time { } // AddListener registers new worker event listener. -func (w *Process) AddListener(listener worker.EventListener) { +func (w *Process) AddListener(listener events.EventListener) { w.events.AddListener(listener) } @@ -279,7 +280,7 @@ func (w *Process) watch() { buf := w.get() // read the last data n, _ := w.rd.Read(*buf) - w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message w.stderr.Write((*buf)[:n]) @@ -290,7 +291,7 @@ func (w *Process) watch() { // read the max 10kb of stderr per one read buf := w.get() n, _ := w.rd.Read(*buf) - w.events.Push(worker.Event{Event: worker.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message w.stderr.Write((*buf)[:n]) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 530ce5d6..8a71ff8a 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" @@ -140,7 +140,7 @@ func (stack *Stack) Destroy(ctx context.Context) { } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events worker.EventsHandler) worker.Watcher { +func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -158,7 +158,7 @@ type workerWatcher struct { allocator worker.Allocator initialNumWorkers int64 actualNumWorkers int64 - events worker.EventsHandler + events events.Handler } func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { @@ -229,8 +229,8 @@ func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Unlock() ww.PushWorker(sw) - ww.events.Push(pool.Event{ - Event: pool.EventWorkerConstruct, + ww.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, Payload: sw, }) @@ -279,8 +279,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("process wait") err := w.Wait() if err != nil { - ww.events.Push(worker.Event{ - Event: worker.EventWorkerError, + ww.events.Push(events.WorkerEvent{ + Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err), }) @@ -294,8 +294,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { - ww.events.Push(pool.Event{ - Event: pool.EventPoolError, + ww.events.Push(events.PoolEvent{ + Event: events.EventPoolError, Payload: errors.E(op, err), }) } |