diff options
author | Valery Piashchynski <[email protected]> | 2020-11-27 00:35:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-27 00:35:15 +0300 |
commit | fa9a7e319b5ac6d98fd18d1e4986de35fde254e5 (patch) | |
tree | 559908f3a491a15bb4926f79dbfde350ec7d4c40 /sync_worker.go | |
parent | 46ae5dcc22d971b0f909bce23ec8fdef26811ed6 (diff) |
Add new pool event: EventNoFreeWorkers which indicates than RR can't get
worker from the stack during the allowed allocate timeout.
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-x | sync_worker.go | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/sync_worker.go b/sync_worker.go index cd0f934e..7e4d21cc 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "time" "github.com/spiral/errors" @@ -36,12 +35,13 @@ func NewSyncWorker(w WorkerBase) (SyncWorker, error) { // Exec payload without TTL timeout. func (tw *syncWorker) Exec(p Payload) (Payload, error) { + const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return EmptyPayload, fmt.Errorf("payload can not be empty") + return EmptyPayload, errors.E(op, errors.Str("payload can not be empty")) } if tw.w.State().Value() != StateReady { - return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()) + return EmptyPayload, errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())) } // set last used time @@ -51,7 +51,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.Exec, err) == false { + if errors.Is(errors.ErrSoftJob, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -97,7 +97,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.Exec, err) == false { + if errors.Is(errors.ErrSoftJob, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -152,11 +152,11 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { } if !pr.HasFlag(goridge.PayloadControl) { - return EmptyPayload, fmt.Errorf("malformed WorkerProcess response") + return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response")) } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) + return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context))) } // add streaming support :) |