From 025b6c84ac0592fa0f1bc07efe9c62646c397bef Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 18 Oct 2021 15:33:20 +0300 Subject: Update error handling in the pipe/socket worker allocate methods Signed-off-by: Valery Piashchynski --- internal/protocol.go | 30 ++++++++++++++---------------- pool/static_pool.go | 28 ++++++++++++---------------- transport/pipe/pipe_factory.go | 36 ++++++++++++------------------------ transport/socket/socket_factory.go | 32 ++++++++------------------------ 4 files changed, 46 insertions(+), 80 deletions(-) diff --git a/internal/protocol.go b/internal/protocol.go index 73cb960e..e6e6e7ab 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -34,8 +34,6 @@ func putFrame(f *frame.Frame) { } func SendControl(rl relay.Relay, payload interface{}) error { - const op = errors.Op("send_control") - fr := getFrame() defer putFrame(fr) @@ -45,7 +43,7 @@ func SendControl(rl relay.Relay, payload interface{}) error { if data, ok := payload.([]byte); ok { // check if payload no more that 4Gb if uint32(len(data)) > ^uint32(0) { - return errors.E(op, errors.Str("payload is more that 4gb")) + return errors.Str("payload is more that 4gb") } fr.WritePayloadLen(fr.Header(), uint32(len(data))) @@ -54,14 +52,14 @@ func SendControl(rl relay.Relay, payload interface{}) error { err := rl.Send(fr) if err != nil { - return errors.E(op, err) + return err } return nil } data, err := json.Marshal(payload) if err != nil { - return errors.E(op, errors.Errorf("invalid payload: %s", err)) + return errors.Errorf("invalid payload: %s", err) } fr.WritePayloadLen(fr.Header(), uint32(len(data))) @@ -71,47 +69,47 @@ func SendControl(rl relay.Relay, payload interface{}) error { // we don't need a copy here, because frame copy the data before send err = rl.Send(fr) if err != nil { - return errors.E(op, err) + return errors.E(errors.FileNotFound, err) } return nil } func Pid(rl relay.Relay) (int64, error) { - const op = errors.Op("fetch_pid") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { - return 0, errors.E(op, err) + return 0, err } fr := getFrame() defer putFrame(fr) err = rl.Receive(fr) - if !fr.VerifyCRC(fr.Header()) { - return 0, errors.E(op, errors.Str("CRC mismatch")) - } if err != nil { - return 0, errors.E(op, err) + return 0, err + } + + if !fr.VerifyCRC(fr.Header()) { + return 0, errors.Str("CRC mismatch") } if fr == nil { - return 0, errors.E(op, errors.Str("nil frame received")) + return 0, errors.Str("nil frame received") } flags := fr.ReadFlags() if flags&frame.CONTROL == 0 { - return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) + return 0, errors.Str("unexpected response, header is missing, no CONTROL flag") } link := &pidCommand{} err = json.Unmarshal(fr.Payload(), link) if err != nil { - return 0, errors.E(op, err) + return 0, err } if link.Pid <= 0 { - return 0, errors.E(op, errors.Str("pid should be greater than 0")) + return 0, errors.Str("pid should be greater than 0") } return int64(link.Pid), nil diff --git a/pool/static_pool.go b/pool/static_pool.go index 25097395..6c2df4e3 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -52,9 +52,8 @@ type StaticPool struct { // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { - const op = errors.Op("static_pool_initialize") if factory == nil { - return nil, errors.E(op, errors.Str("no factory initialized")) + return nil, errors.Str("no factory initialized") } cfg.InitDefaults() @@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { - return nil, errors.E(op, err) + return nil, err } // add workers to the watcher err = p.ww.Watch(workers) if err != nil { - return nil, errors.E(op, err) + return nil, err } p.errEncoder = defaultErrEncoder(p) @@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { w.State().Set(worker.StateInvalid) errS := w.Stop() if errS != nil { - return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) } return nil, err @@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { - return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) } - return nil, errors.E(op, err) + return nil, err } } } @@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // execDebug used when debug mode was not set and exec_ttl is 0 func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec_debug") sw, err := sp.allocator() if err != nil { return nil, err @@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { // redirect call to the workers' exec method (without ttl) r, err := sw.Exec(p) if err != nil { - return nil, errors.E(op, err) + return nil, err } // destroy the worker @@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { err = sw.Kill() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) - return nil, errors.E(op, err) + return nil, err } return r, nil @@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("static_pool_allocate_workers") workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { - return nil, errors.E(op, errors.WorkerAllocate, err) + return nil, err } workers = append(workers, w) diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 84a9d311..3ea8fd98 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -4,12 +4,10 @@ import ( "context" "os/exec" - "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" - "go.uber.org/multierr" ) // Factory connects to stack using standard @@ -31,14 +29,13 @@ type sr struct { // method Wait() must be handled on level above. func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { spCh := make(chan sr) - const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { select { case spCh <- sr{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -51,7 +48,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis select { case spCh <- sr{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -64,7 +61,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis select { case spCh <- sr{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -82,7 +79,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis select { case spCh <- sr{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -93,15 +90,11 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis // used as a ping _, err = internal.Pid(relay) if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) + _ = w.Kill() select { case spCh <- sr{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -138,20 +131,19 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker") w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - return nil, errors.E(op, err) + return nil, err } in, err := cmd.StdoutPipe() if err != nil { - return nil, errors.E(op, err) + return nil, err } out, err := cmd.StdinPipe() if err != nil { - return nil, errors.E(op, err) + return nil, err } // Init new PIPE relay @@ -161,18 +153,14 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor // Start the worker err = w.Start() if err != nil { - return nil, errors.E(op, err) + return nil, err } // errors bundle _, err = internal.Pid(relay) if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) + _ = w.Kill() + return nil, err } // everything ok, set ready state diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index 39c04eac..dfffdf4e 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -16,7 +16,6 @@ import ( "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" - "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -85,7 +84,6 @@ type socketSpawn struct { // SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) @@ -95,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis select { case c <- socketSpawn{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -108,7 +106,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis select { case c <- socketSpawn{ w: nil, - err: errors.E(op, err), + err: err, }: return default: @@ -118,17 +116,12 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis rl, err := f.findRelayWithContext(ctxT, w) if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - + _ = w.Kill() select { // try to write result case c <- socketSpawn{ w: nil, - err: errors.E(op, err), + err: err, }: return // if no receivers - return @@ -165,7 +158,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker") w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { return nil, err @@ -173,16 +165,12 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor err = w.Start() if err != nil { - return nil, errors.E(op, err) + return nil, err } rl, err := f.findRelay(w) if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) + _ = w.Kill() return nil, err } @@ -191,12 +179,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor // errors bundle _, err = internal.Pid(rl) if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) + _ = w.Kill() + return nil, err } w.State().Set(worker.StateReady) -- cgit v1.2.3 From c23ea80b8aa4c5b72ec869f516ad6ac06f0a2e44 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 18 Oct 2021 15:50:31 +0300 Subject: use proper type of error Signed-off-by: Valery Piashchynski --- pool/static_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool/static_pool.go b/pool/static_pool.go index 6c2df4e3..f500f998 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -361,7 +361,7 @@ func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { - return nil, err + return nil, errors.E(errors.WorkerAllocate, err) } workers = append(workers, w) -- cgit v1.2.3 From 7baf8d9732810e97b0c2b43a8bb20c3e06985b8c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 18 Oct 2021 15:55:14 +0300 Subject: update go version in the CI Signed-off-by: Valery Piashchynski --- .github/workflows/linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index be74d606..9d2610ea 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -23,7 +23,7 @@ jobs: fail-fast: true matrix: php: ["7.4", "8.0", "8.1"] - go: ["1.17.1"] + go: ["1.17.2"] os: ["ubuntu-latest"] steps: - name: Set up Go ${{ matrix.go }} -- cgit v1.2.3 From 6a0394a011e8b799d036a80e72d172162122c9f5 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 18 Oct 2021 16:09:50 +0300 Subject: bump goridge dep Signed-off-by: Valery Piashchynski --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 23905e0f..e39ab00a 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/shirou/gopsutil v3.21.9+incompatible // spiral github.com/spiral/errors v1.0.12 - github.com/spiral/goridge/v3 v3.2.2 + github.com/spiral/goridge/v3 v3.2.3 // spiral github.com/stretchr/testify v1.7.0 github.com/valyala/tcplisten v1.0.0 diff --git a/go.sum b/go.sum index 6195a83c..c58616f8 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/shirou/gopsutil v3.21.9+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s= github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA= -github.com/spiral/goridge/v3 v3.2.2 h1:plrzrISm3Q3Z74p+tsi90XwU9n4859LWDXK0PtGNoFE= -github.com/spiral/goridge/v3 v3.2.2/go.mod h1:DA4Ekw9qVcTvVouUNJgxESXURBHZ2SfkliCEIpEl9lA= +github.com/spiral/goridge/v3 v3.2.3 h1:iNz6aD/c00hC50wo+qT8uP5ZZ3VdCAERXUtNiyDE3Yo= +github.com/spiral/goridge/v3 v3.2.3/go.mod h1:DA4Ekw9qVcTvVouUNJgxESXURBHZ2SfkliCEIpEl9lA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -- cgit v1.2.3