diff options
author | Valery Piashchynski <[email protected]> | 2021-06-03 14:54:06 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-03 14:54:06 +0300 |
commit | 62bbde7936109d18bf1f727974719804dad4c105 (patch) | |
tree | 54fb8493840837294bbe84ba5e1d7663ed027cad /pkg | |
parent | 9c01e7ab1548e1416598b702d63866fa6dc5707b (diff) |
- Do not write an error into the responseWriter if this is internal
error
- Handle SoftJob error
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 14 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 14 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 7 | ||||
-rw-r--r-- | pkg/worker_handler/handler.go | 30 | ||||
-rw-r--r-- | pkg/worker_handler/request.go | 6 | ||||
-rw-r--r-- | pkg/worker_handler/response.go | 6 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 2 |
7 files changed, 51 insertions, 28 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index b5d97b8b..ab025fa1 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -46,8 +46,8 @@ type StaticPool struct { // allocate new worker allocator worker.Allocator - // err_encoder is the default Exec error encoder - err_encoder ErrorEncoder //nolint:stylecheck + // errEncoder is the default Exec error encoder + errEncoder ErrorEncoder } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -92,7 +92,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return nil, errors.E(op, err) } - p.err_encoder = defaultErrEncoder(p) + p.errEncoder = defaultErrEncoder(p) // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { @@ -149,7 +149,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { rsp, err := w.(worker.SyncWorker).Exec(p) if err != nil { - return sp.err_encoder(err, w) + return sp.errEncoder(err, w) } // worker want's to be terminated @@ -183,7 +183,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) if err != nil { - return sp.err_encoder(err, w) + return sp.errEncoder(err, w) } // worker want's to be terminated @@ -264,6 +264,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) sp.ww.Push(w) } } @@ -271,9 +272,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { w.State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() - if errS != nil { - return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + return payload.Payload{}, errors.E(op, err, errS) } return payload.Payload{}, errors.E(op, err) diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 7a1f3131..3d049ba7 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -44,11 +44,11 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple + if !errors.Is(errors.SoftJob, err) { tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } - return payload.Payload{}, err + return payload.Payload{}, errors.E(op, err) } tw.process.State().Set(StateReady) @@ -152,21 +152,21 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err := tw.Relay().Send(fr) if err != nil { - return payload.Payload{}, err + return payload.Payload{}, errors.E(op, errors.Network, err) } frameR := frame.NewFrame() err = tw.process.Relay().Receive(frameR) if err != nil { - return payload.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, errors.Network, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) + return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { - return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() @@ -177,7 +177,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error options := frameR.ReadOptions() if len(options) != 1 { - return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } pl := payload.Payload{} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 69c438b0..2044d0e7 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -181,12 +181,13 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { - var err error + const op = errors.Op("process_stop") w.state.Set(StateStopping) - err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) if err != nil { w.state.Set(StateKilling) - return multierr.Append(err, w.cmd.Process.Signal(os.Kill)) + _ = w.cmd.Process.Signal(os.Kill) + return errors.E(op, errors.Network, err) } w.state.Set(StateStopped) return nil diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go index e0d1aae0..672c5838 100644 --- a/pkg/worker_handler/handler.go +++ b/pkg/worker_handler/handler.go @@ -100,14 +100,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { size, err := strconv.ParseInt(length, 10, 64) if err != nil { // if got an error while parsing -> assign 500 code to the writer and return - http.Error(w, errors.E(op, err).Error(), 500) + http.Error(w, "", 500) h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)}) return } if size > int64(h.maxRequestSize) { h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)}) - http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500) + http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), http.StatusBadRequest) return } } @@ -135,21 +135,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { p, err := req.Payload() if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) + h.handleError(w, r, start, err) h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) return } rsp, err := h.pool.Exec(p) if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) + h.handleError(w, r, start, err) h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) return } resp, err := NewResponse(rsp) if err != nil { - http.Error(w, errors.E(op, err).Error(), resp.Status) + h.handleError(w, r, start, err) h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) return } @@ -162,6 +162,26 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// handleError will handle internal RR errors and return 500 +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, start time.Time, err error) { + const op = errors.Op("handle_error") + // internal error types, user should not see them + if errors.Is(errors.SoftJob, err) || + errors.Is(errors.WatcherStopped, err) || + errors.Is(errors.WorkerAllocate, err) || + errors.Is(errors.NoFreeWorkers, err) || + errors.Is(errors.ExecTTL, err) || + errors.Is(errors.IdleTTL, err) || + errors.Is(errors.TTL, err) || + errors.Is(errors.Encode, err) || + errors.Is(errors.Decode, err) || + errors.Is(errors.Network, err) { + // write an internal server error + w.WriteHeader(http.StatusInternalServerError) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + } +} + // handleResponse triggers response event. func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go index 75ee8381..44c466bb 100644 --- a/pkg/worker_handler/request.go +++ b/pkg/worker_handler/request.go @@ -9,6 +9,7 @@ import ( "strings" j "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/plugins/http/config" @@ -138,16 +139,17 @@ func (r *Request) Close(log logger.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. func (r *Request) Payload() (payload.Payload, error) { + const op = errors.Op("marshal_payload") p := payload.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return payload.Payload{}, err + return payload.Payload{}, errors.E(op, errors.Encode, err) } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return payload.Payload{}, err + return payload.Payload{}, errors.E(op, errors.Encode, err) } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go index 1763d304..cbf22794 100644 --- a/pkg/worker_handler/response.go +++ b/pkg/worker_handler/response.go @@ -4,8 +4,8 @@ import ( "io" "net/http" "strings" - "sync" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/payload" ) @@ -19,14 +19,14 @@ type Response struct { // associated Body payload. Body interface{} - sync.Mutex } // NewResponse creates new response based on given pool payload. func NewResponse(p payload.Payload) (*Response, error) { + const op = errors.Op("http_response") r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { - return nil, err + return nil, errors.E(op, errors.Decode, err) } return r, nil diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 557563ac..108756fc 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -182,7 +182,7 @@ func (ww *workerWatcher) Push(w worker.BaseProcess) { } // Destroy all underlying container (but let them to complete the task) -func (ww *workerWatcher) Destroy(ctx context.Context) { +func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() // do not release new workers |