diff options
author | Valery Piashchynski <[email protected]> | 2020-11-09 14:51:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-09 14:51:34 +0300 |
commit | 83c14cbad2d7d403b08efbb3cf900df9b52b4938 (patch) | |
tree | b084a2ca99eb7523232f477678f8aa2a82cd5812 | |
parent | b7b533dbe13d2c1a8e78c0e33a4a388c56884440 (diff) |
Add spiral errors
-rwxr-xr-x | process_state_test.go | 1 | ||||
-rwxr-xr-x | static_pool.go | 26 | ||||
-rwxr-xr-x | static_pool_test.go | 20 | ||||
-rwxr-xr-x | supervisor_pool.go | 16 | ||||
-rwxr-xr-x | sync_worker.go | 4 | ||||
-rwxr-xr-x | sync_worker_test.go | 2 | ||||
-rwxr-xr-x | worker_watcher.go | 15 |
7 files changed, 47 insertions, 37 deletions
diff --git a/process_state_test.go b/process_state_test.go deleted file mode 100755 index 3f283dce..00000000 --- a/process_state_test.go +++ /dev/null @@ -1 +0,0 @@ -package roadrunner diff --git a/static_pool.go b/static_pool.go index ee81fd39..17ec605e 100755 --- a/static_pool.go +++ b/static_pool.go @@ -189,10 +189,8 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { const op = errors.Op("Exec") w, err := sp.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + if err != nil { return EmptyPayload, errors.E(op, err) - } else if err != nil { - return EmptyPayload, err } sw := w.(SyncWorker) @@ -204,19 +202,19 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew(bCtx) if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) } - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } sw.State().Set(StateInvalid) @@ -224,10 +222,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload errS := w.Stop(bCtx) if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } // worker want's to be terminated @@ -235,7 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -244,7 +242,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew(bCtx) if err != nil { - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } } else { sp.ww.PushWorker(w) @@ -259,6 +257,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { // allocate required number of stack func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { + const op = errors.Op("allocate workers") var workers []WorkerBase // constant number of stack simplify logic @@ -267,20 +266,21 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([] w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() - return nil, err + return nil, errors.E(op, err) } - cancel() workers = append(workers, w) + cancel() } return workers, nil } func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { + const op = errors.Op("check max jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err := sp.ww.AllocateNew(ctx) if err != nil { sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - return err + return errors.E(op, err) } } return nil diff --git a/static_pool_test.go b/static_pool_test.go index 309449ab..8f8a6f56 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -157,7 +157,7 @@ func Test_StaticPool_JobError(t *testing.T) { t.Fatal("error should be of type errors.Exec") } - assert.Contains(t, err.Error(), "exec_payload: Exec: hello") + assert.Contains(t, err.Error(), "exec payload: Exec: hello") } func Test_StaticPool_Broken_Replace(t *testing.T) { @@ -174,22 +174,24 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - p.AddListener(func(event interface{}) { - if pev, ok := event.(PoolEvent); ok { - sw := pev.Payload.(SyncWorker) - sw.AddListener(func(event interface{}) { - if wev, ok := event.(WorkerEvent); ok { + workers := p.Workers() + for i := 0; i < len(workers); i++ { + workers[i].AddListener(func(event interface{}) { + if wev, ok := event.(WorkerEvent); ok { + if wev.Event == EventWorkerLog { assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()") wg.Done() return } - }) - } - }) + } + }) + } + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) + wg.Wait() p.Destroy(ctx) diff --git a/supervisor_pool.go b/supervisor_pool.go index 92d03e77..e23abdd1 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -54,7 +54,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay res, err := sp.pool.ExecWithContext(ctx, rqs) if err != nil { c <- ttlExec{ - err: err, + err: errors.E(op, err), p: EmptyPayload, } } @@ -80,7 +80,12 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay } func (sp *supervisedPool) Exec(p Payload) (Payload, error) { - return sp.pool.Exec(p) + const op = errors.Op("supervised exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + return rsp, nil } func (sp *supervisedPool) AddListener(listener util.EventListener) { @@ -130,6 +135,7 @@ func (sp *supervisedPool) Stop() { func (sp *supervisedPool) control() { now := time.Now() ctx := context.TODO() + const op = errors.Op("supervised pool control tick") // THIS IS A COPY OF WORKERS workers := sp.pool.Workers() @@ -148,7 +154,7 @@ func (sp *supervisedPool) control() { if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) @@ -160,7 +166,7 @@ func (sp *supervisedPool) control() { if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) @@ -193,7 +199,7 @@ func (sp *supervisedPool) control() { if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) diff --git a/sync_worker.go b/sync_worker.go index 56953fe6..a9c53553 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -133,7 +133,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, } func (tw *syncWorker) execPayload(p Payload) (Payload, error) { - const op = errors.Op("exec_payload") + const op = errors.Op("exec payload") // two things; todo: merge if err := sendControl(tw.w.Relay(), p.Context); err != nil { return EmptyPayload, errors.E(op, err, "header error") @@ -156,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) //ExecError(rsp.Context) + return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) } // add streaming support :) diff --git a/sync_worker_test.go b/sync_worker_test.go index f93b1356..add0a066 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -210,7 +210,7 @@ func Test_Error(t *testing.T) { if errors.Is(errors.Exec, err) == false { t.Fatal("error should be of type errors.Exec") } - assert.Contains(t, err.Error(), "exec_payload: Exec: hello") + assert.Contains(t, err.Error(), "exec payload: Exec: hello") } func Test_NumExecs(t *testing.T) { diff --git a/worker_watcher.go b/worker_watcher.go index 36b3e029..3a89554d 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -158,7 +158,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) ww.ReduceWorkersCount() return w, nil case <-tout.C: - return nil, errors.Str("no free stack") + return nil, errors.E(op, errors.Str("no free workers in the stack")) } } } @@ -169,9 +169,10 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) func (ww *workerWatcher) AllocateNew(ctx context.Context) error { ww.stack.mutex.Lock() + const op = errors.Op("allocate new worker") sw, err := ww.allocator() if err != nil { - return err + return errors.E(op, err) } ww.addToWatch(sw) @@ -188,6 +189,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error { func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { ww.stack.mutex.Lock() + const op = errors.Op("remove worker") defer ww.stack.mutex.Unlock() pid := wb.Pid() for i := 0; i < len(ww.stack.workers); i++ { @@ -200,7 +202,7 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error wb.State().Set(StateInvalid) err := wb.Kill() if err != nil { - return err + return errors.E(op, err) } break } @@ -274,12 +276,13 @@ func (ww *workerWatcher) WorkersList() []WorkerBase { } func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { + const op = errors.Op("process wait") err := w.Wait(ctx) if err != nil { ww.events.Push(WorkerEvent{ Event: EventWorkerError, Worker: w, - Payload: err, + Payload: errors.E(op, err), }) } @@ -301,7 +304,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, - Payload: err, + Payload: errors.E(op, err), }) } @@ -316,7 +319,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, - Payload: err, + Payload: errors.E(op, err), }) return } |