summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-03 14:54:06 +0300
committerValery Piashchynski <[email protected]>2021-06-03 14:54:06 +0300
commit62bbde7936109d18bf1f727974719804dad4c105 (patch)
tree54fb8493840837294bbe84ba5e1d7663ed027cad /pkg
parent9c01e7ab1548e1416598b702d63866fa6dc5707b (diff)
- Do not write an error into the responseWriter if this is internal
error - Handle SoftJob error Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go14
-rwxr-xr-xpkg/worker/sync_worker.go14
-rwxr-xr-xpkg/worker/worker.go7
-rw-r--r--pkg/worker_handler/handler.go30
-rw-r--r--pkg/worker_handler/request.go6
-rw-r--r--pkg/worker_handler/response.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go2
7 files changed, 51 insertions, 28 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index b5d97b8b..ab025fa1 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -46,8 +46,8 @@ type StaticPool struct {
// allocate new worker
allocator worker.Allocator
- // err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:stylecheck
+ // errEncoder is the default Exec error encoder
+ errEncoder ErrorEncoder
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -92,7 +92,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return nil, errors.E(op, err)
}
- p.err_encoder = defaultErrEncoder(p)
+ p.errEncoder = defaultErrEncoder(p)
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
@@ -149,7 +149,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := w.(worker.SyncWorker).Exec(p)
if err != nil {
- return sp.err_encoder(err, w)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
@@ -183,7 +183,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
if err != nil {
- return sp.err_encoder(err, w)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
@@ -264,6 +264,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
sp.ww.Push(w)
}
}
@@ -271,9 +272,8 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
-
if errS != nil {
- return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return payload.Payload{}, errors.E(op, err, errS)
}
return payload.Payload{}, errors.E(op, err)
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 7a1f3131..3d049ba7 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -44,11 +44,11 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
+ if !errors.Is(errors.SoftJob, err) {
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
- return payload.Payload{}, err
+ return payload.Payload{}, errors.E(op, err)
}
tw.process.State().Set(StateReady)
@@ -152,21 +152,21 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err := tw.Relay().Send(fr)
if err != nil {
- return payload.Payload{}, err
+ return payload.Payload{}, errors.E(op, errors.Network, err)
}
frameR := frame.NewFrame()
err = tw.process.Relay().Receive(frameR)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, errors.Network, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
+ return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
@@ -177,7 +177,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
options := frameR.ReadOptions()
if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
pl := payload.Payload{}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 69c438b0..2044d0e7 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -181,12 +181,13 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
- var err error
+ const op = errors.Op("process_stop")
w.state.Set(StateStopping)
- err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true})
if err != nil {
w.state.Set(StateKilling)
- return multierr.Append(err, w.cmd.Process.Signal(os.Kill))
+ _ = w.cmd.Process.Signal(os.Kill)
+ return errors.E(op, errors.Network, err)
}
w.state.Set(StateStopped)
return nil
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
index e0d1aae0..672c5838 100644
--- a/pkg/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -100,14 +100,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
size, err := strconv.ParseInt(length, 10, 64)
if err != nil {
// if got an error while parsing -> assign 500 code to the writer and return
- http.Error(w, errors.E(op, err).Error(), 500)
+ http.Error(w, "", 500)
h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)})
return
}
if size > int64(h.maxRequestSize) {
h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)})
- http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500)
+ http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), http.StatusBadRequest)
return
}
}
@@ -135,21 +135,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p, err := req.Payload()
if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
+ h.handleError(w, r, start, err)
h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
return
}
rsp, err := h.pool.Exec(p)
if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
+ h.handleError(w, r, start, err)
h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
return
}
resp, err := NewResponse(rsp)
if err != nil {
- http.Error(w, errors.E(op, err).Error(), resp.Status)
+ h.handleError(w, r, start, err)
h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
return
}
@@ -162,6 +162,26 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
+// handleError will handle internal RR errors and return 500
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, start time.Time, err error) {
+ const op = errors.Op("handle_error")
+ // internal error types, user should not see them
+ if errors.Is(errors.SoftJob, err) ||
+ errors.Is(errors.WatcherStopped, err) ||
+ errors.Is(errors.WorkerAllocate, err) ||
+ errors.Is(errors.NoFreeWorkers, err) ||
+ errors.Is(errors.ExecTTL, err) ||
+ errors.Is(errors.IdleTTL, err) ||
+ errors.Is(errors.TTL, err) ||
+ errors.Is(errors.Encode, err) ||
+ errors.Is(errors.Decode, err) ||
+ errors.Is(errors.Network, err) {
+ // write an internal server error
+ w.WriteHeader(http.StatusInternalServerError)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ }
+}
+
// handleResponse triggers response event.
func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
index 75ee8381..44c466bb 100644
--- a/pkg/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
@@ -9,6 +9,7 @@ import (
"strings"
j "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/plugins/http/config"
@@ -138,16 +139,17 @@ func (r *Request) Close(log logger.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
func (r *Request) Payload() (payload.Payload, error) {
+ const op = errors.Op("marshal_payload")
p := payload.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, err
+ return payload.Payload{}, errors.E(op, errors.Encode, err)
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, err
+ return payload.Payload{}, errors.E(op, errors.Encode, err)
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
index 1763d304..cbf22794 100644
--- a/pkg/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
@@ -4,8 +4,8 @@ import (
"io"
"net/http"
"strings"
- "sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
)
@@ -19,14 +19,14 @@ type Response struct {
// associated Body payload.
Body interface{}
- sync.Mutex
}
// NewResponse creates new response based on given pool payload.
func NewResponse(p payload.Payload) (*Response, error) {
+ const op = errors.Op("http_response")
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
- return nil, err
+ return nil, errors.E(op, errors.Decode, err)
}
return r, nil
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 557563ac..108756fc 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -182,7 +182,7 @@ func (ww *workerWatcher) Push(w worker.BaseProcess) {
}
// Destroy all underlying container (but let them to complete the task)
-func (ww *workerWatcher) Destroy(ctx context.Context) {
+func (ww *workerWatcher) Destroy(_ context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
ww.Lock()
// do not release new workers