summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-09 14:51:34 +0300
committerValery Piashchynski <[email protected]>2020-11-09 14:51:34 +0300
commit83c14cbad2d7d403b08efbb3cf900df9b52b4938 (patch)
treeb084a2ca99eb7523232f477678f8aa2a82cd5812
parentb7b533dbe13d2c1a8e78c0e33a4a388c56884440 (diff)
Add spiral errors
-rwxr-xr-xprocess_state_test.go1
-rwxr-xr-xstatic_pool.go26
-rwxr-xr-xstatic_pool_test.go20
-rwxr-xr-xsupervisor_pool.go16
-rwxr-xr-xsync_worker.go4
-rwxr-xr-xsync_worker_test.go2
-rwxr-xr-xworker_watcher.go15
7 files changed, 47 insertions, 37 deletions
diff --git a/process_state_test.go b/process_state_test.go
deleted file mode 100755
index 3f283dce..00000000
--- a/process_state_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package roadrunner
diff --git a/static_pool.go b/static_pool.go
index ee81fd39..17ec605e 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -189,10 +189,8 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
const op = errors.Op("Exec")
w, err := sp.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ if err != nil {
return EmptyPayload, errors.E(op, err)
- } else if err != nil {
- return EmptyPayload, err
}
sw := w.(SyncWorker)
@@ -204,19 +202,19 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew(bCtx)
if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
}
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
sw.State().Set(StateInvalid)
@@ -224,10 +222,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
errS := w.Stop(bCtx)
if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
// worker want's to be terminated
@@ -235,7 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -244,7 +242,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew(bCtx)
if err != nil {
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
} else {
sp.ww.PushWorker(w)
@@ -259,6 +257,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+ const op = errors.Op("allocate workers")
var workers []WorkerBase
// constant number of stack simplify logic
@@ -267,20 +266,21 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
- return nil, err
+ return nil, errors.E(op, err)
}
- cancel()
workers = append(workers, w)
+ cancel()
}
return workers, nil
}
func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
+ const op = errors.Op("check max jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.AllocateNew(ctx)
if err != nil {
sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
- return err
+ return errors.E(op, err)
}
}
return nil
diff --git a/static_pool_test.go b/static_pool_test.go
index 309449ab..8f8a6f56 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -157,7 +157,7 @@ func Test_StaticPool_JobError(t *testing.T) {
t.Fatal("error should be of type errors.Exec")
}
- assert.Contains(t, err.Error(), "exec_payload: Exec: hello")
+ assert.Contains(t, err.Error(), "exec payload: Exec: hello")
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
@@ -174,22 +174,24 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
- p.AddListener(func(event interface{}) {
- if pev, ok := event.(PoolEvent); ok {
- sw := pev.Payload.(SyncWorker)
- sw.AddListener(func(event interface{}) {
- if wev, ok := event.(WorkerEvent); ok {
+ workers := p.Workers()
+ for i := 0; i < len(workers); i++ {
+ workers[i].AddListener(func(event interface{}) {
+ if wev, ok := event.(WorkerEvent); ok {
+ if wev.Event == EventWorkerLog {
assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()")
wg.Done()
return
}
- })
- }
- })
+ }
+ })
+ }
+
res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
+
wg.Wait()
p.Destroy(ctx)
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 92d03e77..e23abdd1 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -54,7 +54,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
res, err := sp.pool.ExecWithContext(ctx, rqs)
if err != nil {
c <- ttlExec{
- err: err,
+ err: errors.E(op, err),
p: EmptyPayload,
}
}
@@ -80,7 +80,12 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
}
func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
- return sp.pool.Exec(p)
+ const op = errors.Op("supervised exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ return rsp, nil
}
func (sp *supervisedPool) AddListener(listener util.EventListener) {
@@ -130,6 +135,7 @@ func (sp *supervisedPool) Stop() {
func (sp *supervisedPool) control() {
now := time.Now()
ctx := context.TODO()
+ const op = errors.Op("supervised pool control tick")
// THIS IS A COPY OF WORKERS
workers := sp.pool.Workers()
@@ -148,7 +154,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
@@ -160,7 +166,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
@@ -193,7 +199,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
diff --git a/sync_worker.go b/sync_worker.go
index 56953fe6..a9c53553 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -133,7 +133,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
}
func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
- const op = errors.Op("exec_payload")
+ const op = errors.Op("exec payload")
// two things; todo: merge
if err := sendControl(tw.w.Relay(), p.Context); err != nil {
return EmptyPayload, errors.E(op, err, "header error")
@@ -156,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) //ExecError(rsp.Context)
+ return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context)))
}
// add streaming support :)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index f93b1356..add0a066 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -210,7 +210,7 @@ func Test_Error(t *testing.T) {
if errors.Is(errors.Exec, err) == false {
t.Fatal("error should be of type errors.Exec")
}
- assert.Contains(t, err.Error(), "exec_payload: Exec: hello")
+ assert.Contains(t, err.Error(), "exec payload: Exec: hello")
}
func Test_NumExecs(t *testing.T) {
diff --git a/worker_watcher.go b/worker_watcher.go
index 36b3e029..3a89554d 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -158,7 +158,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
ww.ReduceWorkersCount()
return w, nil
case <-tout.C:
- return nil, errors.Str("no free stack")
+ return nil, errors.E(op, errors.Str("no free workers in the stack"))
}
}
}
@@ -169,9 +169,10 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
ww.stack.mutex.Lock()
+ const op = errors.Op("allocate new worker")
sw, err := ww.allocator()
if err != nil {
- return err
+ return errors.E(op, err)
}
ww.addToWatch(sw)
@@ -188,6 +189,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
ww.stack.mutex.Lock()
+ const op = errors.Op("remove worker")
defer ww.stack.mutex.Unlock()
pid := wb.Pid()
for i := 0; i < len(ww.stack.workers); i++ {
@@ -200,7 +202,7 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
wb.State().Set(StateInvalid)
err := wb.Kill()
if err != nil {
- return err
+ return errors.E(op, err)
}
break
}
@@ -274,12 +276,13 @@ func (ww *workerWatcher) WorkersList() []WorkerBase {
}
func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
+ const op = errors.Op("process wait")
err := w.Wait(ctx)
if err != nil {
ww.events.Push(WorkerEvent{
Event: EventWorkerError,
Worker: w,
- Payload: err,
+ Payload: errors.E(op, err),
})
}
@@ -301,7 +304,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
- Payload: err,
+ Payload: errors.E(op, err),
})
}
@@ -316,7 +319,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
- Payload: err,
+ Payload: errors.E(op, err),
})
return
}