diff options
author | Valery Piashchynski <[email protected]> | 2020-11-09 15:11:10 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-11-09 15:11:10 +0300 |
commit | 0874bcb2f6b284a940ba4f3507eb8c4619c27868 (patch) | |
tree | c99d15624cd080cad22b7c8fb7d4714b2dc124fb | |
parent | 9fbe7726dd55cfedda724b7644e1b6bf7c1a6cb4 (diff) | |
parent | f218dcbd7e55d9ad1df8336e2331cdaa62d9ded3 (diff) |
Merge pull request #390 from spiral/feature/switch_to_spiral_errorsv2.0.0-alpha17
Feature/switch to spiral errors
-rwxr-xr-x | .github/workflows/ci-build.yml | 26 | ||||
-rwxr-xr-x | errors.go | 24 | ||||
-rwxr-xr-x | errors_test.go | 18 | ||||
-rwxr-xr-x | go.mod | 2 | ||||
-rwxr-xr-x | go.sum | 4 | ||||
-rwxr-xr-x | pipe_factory.go | 44 | ||||
-rw-r--r-- | plugins/app/plugin.go | 15 | ||||
-rwxr-xr-x | process_state.go | 4 | ||||
-rwxr-xr-x | process_state_test.go | 1 | ||||
-rwxr-xr-x | socket_factory_test.go | 2 | ||||
-rwxr-xr-x | static_pool.go | 34 | ||||
-rwxr-xr-x | static_pool_test.go | 86 | ||||
-rwxr-xr-x | supervisor_pool.go | 16 | ||||
-rwxr-xr-x | sync_worker.go | 10 | ||||
-rwxr-xr-x | sync_worker_test.go | 7 | ||||
-rwxr-xr-x | util/isolate.go | 11 | ||||
-rwxr-xr-x | worker_watcher.go | 15 |
17 files changed, 140 insertions, 179 deletions
diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index e3e09d88..99eb8834 100755 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -57,8 +57,8 @@ jobs: - name: Install Composer dependencies run: composer install --prefer-dist --no-interaction --no-suggest # --prefer-source -# - name: Analyze PHP sources -# run: composer analyze + # - name: Analyze PHP sources + # run: composer analyze - name: Install Go dependencies run: go mod download @@ -73,19 +73,15 @@ jobs: go test -v -race ./plugins/app/tests -tags=debug -coverprofile=app.txt -covermode=atomic - name: Run code coverage - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - run: | - if [[ "$CODECOV_TOKEN" != "" ]]; then - curl https://codecov.io/bash -o codecov-bash - chmod +x codecov-bash - ./codecov-bash -f lib.txt - ./codecov-bash -f rpc_config.txt - ./codecov-bash -f rpc.txt - ./codecov-bash -f plugin_config.txt - ./codecov-bash -f logger.txt - ./codecov-bash -f app.txt - fi + uses: codecov/codecov-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: lib.txt, rpc_config.txt, rpc.txt, plugin_config.txt, logger.txt, app.txt + flags: unittests + name: codecov-umbrella + fail_ci_if_error: false + verbose: true + golangci-check: name: runner / golangci-lint diff --git a/errors.go b/errors.go deleted file mode 100755 index 7c91a92b..00000000 --- a/errors.go +++ /dev/null @@ -1,24 +0,0 @@ -package roadrunner - -// ExecError is job level error (no WorkerProcess halt), wraps at top -// of error context -type ExecError []byte - -// Error converts error context to string -func (te ExecError) Error() string { - return string(te) -} - -// WorkerError is WorkerProcess related error -type WorkerError struct { - // Worker - Worker WorkerBase - - // Caused error - Caused error -} - -// Error converts error context to string -func (e WorkerError) Error() string { - return e.Caused.Error() -} diff --git a/errors_test.go b/errors_test.go deleted file mode 100755 index 86ab908d..00000000 --- a/errors_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package roadrunner - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_JobError_Error(t *testing.T) { - e := ExecError([]byte("error")) - assert.Equal(t, "error", e.Error()) -} - -func Test_WorkerError_Error(t *testing.T) { - e := WorkerError{Worker: nil, Caused: errors.New("error")} - assert.Equal(t, "error", e.Error()) -} @@ -11,7 +11,7 @@ require ( github.com/shirou/gopsutil v3.20.10+incompatible github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta15 - github.com/spiral/errors v1.0.2 + github.com/spiral/errors v1.0.4 github.com/spiral/goridge/v2 v2.4.6 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -193,8 +193,8 @@ github.com/spiral/endure v1.0.0-beta15 h1:pBFn9LKQLPSzrG7kGE30T0VEjp2A6yT6p2BRRU github.com/spiral/endure v1.0.0-beta15/go.mod h1:iGh1Zf1cckkJa5J9Obm8d/96kKYvlJBv/D0iHOuYWLQ= github.com/spiral/errors v1.0.1 h1:OyKLwQH+42hhaRYuXGzfPKCFOmawA/PYXTY9wsK99n4= github.com/spiral/errors v1.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/errors v1.0.2 h1:i/XMmA2VJt9sD64N4/zgQ9Y0cwlNQRLDaxOZPZV09D4= -github.com/spiral/errors v1.0.2/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0= +github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k= github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pipe_factory.go b/pipe_factory.go index d6242775..15f38e42 100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -2,9 +2,7 @@ package roadrunner import ( "context" - "fmt" "os/exec" - "strings" "github.com/spiral/errors" "github.com/spiral/goridge/v2" @@ -33,12 +31,13 @@ type SpawnResult struct { // method Wait() must be handled on level above. func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { c := make(chan SpawnResult) + const op = errors.Op("spawn worker with context") go func() { w, err := InitBaseWorker(cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -48,7 +47,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -58,7 +57,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -72,7 +71,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: errors.E(err, "process error"), + err: errors.E(op, err, "process error"), } return } @@ -87,7 +86,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) ) c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -117,19 +116,19 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { const op = errors.Op("spawn worker") w, err := InitBaseWorker(cmd) if err != nil { - return nil, err + return nil, errors.E(op, err) } // TODO why out is in? in, err := cmd.StdoutPipe() if err != nil { - return nil, err + return nil, errors.E(op, err) } // TODO why in is out? out, err := cmd.StdinPipe() if err != nil { - return nil, err + return nil, errors.E(op, err) } // Init new PIPE relay @@ -143,24 +142,13 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { } // errors bundle - var errs []string - if pid, errF := fetchPID(relay); pid != w.Pid() { - if errF != nil { - errs = append(errs, errF.Error()) - } - - errK := w.Kill() - if errK != nil { - errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error()) - } - - if wErr := w.Wait(context.Background()); wErr != nil { - errs = append(errs, wErr.Error()) - } - - if len(errs) > 0 { - return nil, errors.E(op, strings.Join(errs, "/")) - } + if pid, err := fetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(context.Background()), + ) + return nil, errors.E(op, err) } // everything ok, set ready state diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go index 839685bd..d76961ca 100644 --- a/plugins/app/plugin.go +++ b/plugins/app/plugin.go @@ -34,9 +34,10 @@ type Plugin struct { // Init application provider. func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("Init") err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { - return err + return errors.E(op, errors.Init, err) } app.cfg.InitDefaults() app.log = log @@ -97,14 +98,15 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { // NewWorker issues new standalone worker. func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + const op = errors.Op("new worker") spawnCmd, err := app.CmdFactory(env) if err != nil { - return nil, err + return nil, errors.E(op, err) } w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) if err != nil { - return nil, err + return nil, errors.E(op, err) } w.AddListener(app.collectLogs) @@ -131,18 +133,19 @@ func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env // creates relay and worker factory. func (app *Plugin) initFactory() (roadrunner.Factory, error) { + const op = errors.Op("network factory init") if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) if err != nil { - return nil, err + return nil, errors.E(op, errors.Network, err) } switch dsn[0] { @@ -152,7 +155,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/process_state.go b/process_state.go index 1a4c4d65..1291a904 100755 --- a/process_state.go +++ b/process_state.go @@ -2,6 +2,7 @@ package roadrunner import ( "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" ) // ProcessState provides information about specific worker. @@ -25,10 +26,11 @@ type ProcessState struct { // WorkerProcessState creates new worker state definition. func WorkerProcessState(w WorkerBase) (ProcessState, error) { + const op = errors.Op("worker_process state") p, _ := process.NewProcess(int32(w.Pid())) i, err := p.MemoryInfo() if err != nil { - return ProcessState{}, err + return ProcessState{}, errors.E(op, err) } return ProcessState{ diff --git a/process_state_test.go b/process_state_test.go deleted file mode 100755 index 3f283dce..00000000 --- a/process_state_test.go +++ /dev/null @@ -1 +0,0 @@ -package roadrunner diff --git a/socket_factory_test.go b/socket_factory_test.go index 6ab87872..f7b2e69a 100755 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) assert.Contains(t, err2.Error(), "failboot") diff --git a/static_pool.go b/static_pool.go index 66dac7c3..f64a2c9a 100755 --- a/static_pool.go +++ b/static_pool.go @@ -118,9 +118,9 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { rsp, err := sw.Exec(p) if err != nil { // soft job errors are allowed - if _, jobError := err.(ExecError); jobError { + if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew(bCtx) if err != nil { sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } @@ -188,10 +188,8 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { const op = errors.Op("Exec") w, err := sp.ww.GetFreeWorker(context.Background()) - if err != nil && errors.Is(errors.ErrWatcherStopped, err) { + if err != nil { return EmptyPayload, errors.E(op, err) - } else if err != nil { - return EmptyPayload, err } sw := w.(SyncWorker) @@ -199,23 +197,23 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { // soft job errors are allowed - if _, jobError := err.(ExecError); jobError { + if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew(bCtx) if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { sp.ww.PushWorker(w) } - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } sw.State().Set(StateInvalid) @@ -223,10 +221,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload errS := w.Stop(bCtx) if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } // worker want's to be terminated @@ -234,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } return sp.Exec(rqs) @@ -243,7 +241,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew(bCtx) if err != nil { - return EmptyPayload, err + return EmptyPayload, errors.E(op, err) } } else { sp.ww.PushWorker(w) @@ -258,6 +256,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { // allocate required number of stack func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { + const op = errors.Op("allocate workers") var workers []WorkerBase // constant number of stack simplify logic @@ -266,20 +265,21 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([] w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() - return nil, err + return nil, errors.E(op, err) } - cancel() workers = append(workers, w) + cancel() } return workers, nil } func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { + const op = errors.Op("check max jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err := sp.ww.AllocateNew(ctx) if err != nil { sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - return err + return errors.E(op, err) } } return nil diff --git a/static_pool_test.go b/static_pool_test.go index f1e3e4e4..8f8a6f56 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/spiral/errors" "github.com/stretchr/testify/assert" ) @@ -152,52 +153,49 @@ func Test_StaticPool_JobError(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, ExecError{}, err) - assert.Equal(t, "hello", err.Error()) + if errors.Is(errors.Exec, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "exec payload: Exec: hello") } -// TODO temporary commented, figure out later -// func Test_StaticPool_Broken_Replace(t *testing.T) { -// ctx := context.Background() -// p, err := NewPool( -// ctx, -// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, -// NewPipeFactory(), -// cfg, -// ) -// assert.NoError(t, err) -// assert.NotNil(t, p) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// var i int64 -// atomic.StoreInt64(&i, 10) -// -// p.AddListener(func(event interface{}) { -// -// }) -// -// go func() { -// for { -// select { -// case ev := <-p.Events(): -// wev := ev.Payload.(WorkerEvent) -// if _, ok := wev.Payload.([]byte); ok { -// assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()") -// wg.Done() -// return -// } -// } -// } -// }() -// res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) -// assert.Error(t, err) -// assert.Nil(t, res.Context) -// assert.Nil(t, res.Body) -// wg.Wait() -// -// p.Destroy(ctx) -// } +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, + NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(1) + + workers := p.Workers() + for i := 0; i < len(workers); i++ { + workers[i].AddListener(func(event interface{}) { + if wev, ok := event.(WorkerEvent); ok { + if wev.Event == EventWorkerLog { + assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()") + wg.Done() + return + } + } + }) + } + + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + wg.Wait() + + p.Destroy(ctx) +} // func Test_StaticPool_Broken_FromOutside(t *testing.T) { diff --git a/supervisor_pool.go b/supervisor_pool.go index 92d03e77..e23abdd1 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -54,7 +54,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay res, err := sp.pool.ExecWithContext(ctx, rqs) if err != nil { c <- ttlExec{ - err: err, + err: errors.E(op, err), p: EmptyPayload, } } @@ -80,7 +80,12 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay } func (sp *supervisedPool) Exec(p Payload) (Payload, error) { - return sp.pool.Exec(p) + const op = errors.Op("supervised exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + return rsp, nil } func (sp *supervisedPool) AddListener(listener util.EventListener) { @@ -130,6 +135,7 @@ func (sp *supervisedPool) Stop() { func (sp *supervisedPool) control() { now := time.Now() ctx := context.TODO() + const op = errors.Op("supervised pool control tick") // THIS IS A COPY OF WORKERS workers := sp.pool.Workers() @@ -148,7 +154,7 @@ func (sp *supervisedPool) control() { if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) @@ -160,7 +166,7 @@ func (sp *supervisedPool) control() { if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) @@ -193,7 +199,7 @@ func (sp *supervisedPool) control() { if sp.cfg.IdleTTL-uint64(res) <= 0 { err = sp.pool.RemoveWorker(ctx, workers[i]) if err != nil { - sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } else { sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) diff --git a/sync_worker.go b/sync_worker.go index 282254e5..a9c53553 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -50,7 +50,8 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(ExecError); !ok { + // just to be more verbose + if errors.Is(errors.Exec, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -95,7 +96,8 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(ExecError); !ok { + // just to be more verbose + if errors.Is(errors.Exec, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -131,7 +133,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, } func (tw *syncWorker) execPayload(p Payload) (Payload, error) { - const op = errors.Op("exec_payload") + const op = errors.Op("exec payload") // two things; todo: merge if err := sendControl(tw.w.Relay(), p.Context); err != nil { return EmptyPayload, errors.E(op, err, "header error") @@ -154,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, ExecError(rsp.Context) + return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) } // add streaming support :) diff --git a/sync_worker_test.go b/sync_worker_test.go index 1bc2deb1..add0a066 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + "github.com/spiral/errors" "github.com/stretchr/testify/assert" ) @@ -206,8 +207,10 @@ func Test_Error(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, ExecError{}, err) - assert.Equal(t, "hello", err.Error()) + if errors.Is(errors.Exec, err) == false { + t.Fatal("error should be of type errors.Exec") + } + assert.Contains(t, err.Error(), "exec payload: Exec: hello") } func Test_NumExecs(t *testing.T) { diff --git a/util/isolate.go b/util/isolate.go index 005c430e..2bdb9d6c 100755 --- a/util/isolate.go +++ b/util/isolate.go @@ -9,6 +9,8 @@ import ( "os/user" "strconv" "syscall" + + "github.com/spiral/errors" ) // IsolateProcess change gpid for the process to avoid bypassing signals to php processes. @@ -18,19 +20,20 @@ func IsolateProcess(cmd *exec.Cmd) { // ExecuteFromUser may work only if run RR under root user func ExecuteFromUser(cmd *exec.Cmd, u string) error { + const op = errors.Op("execute from user") usr, err := user.Lookup(u) if err != nil { - return err + return errors.E(op, err) } usrI32, err := strconv.Atoi(usr.Uid) if err != nil { - return err + return errors.E(op, err) } grI32, err := strconv.Atoi(usr.Gid) if err != nil { - return err + return errors.E(op, err) } // For more information: @@ -44,7 +47,7 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error { return fmt.Errorf("unable to test user namespaces due to permissions") } - return fmt.Errorf("failed to stat /proc/self/ns/user: %v", err) + return errors.E(op, errors.Errorf("failed to stat /proc/self/ns/user: %v", err)) } cmd.SysProcAttr.Credential = &syscall.Credential{ diff --git a/worker_watcher.go b/worker_watcher.go index 36b3e029..3a89554d 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -158,7 +158,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) ww.ReduceWorkersCount() return w, nil case <-tout.C: - return nil, errors.Str("no free stack") + return nil, errors.E(op, errors.Str("no free workers in the stack")) } } } @@ -169,9 +169,10 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) func (ww *workerWatcher) AllocateNew(ctx context.Context) error { ww.stack.mutex.Lock() + const op = errors.Op("allocate new worker") sw, err := ww.allocator() if err != nil { - return err + return errors.E(op, err) } ww.addToWatch(sw) @@ -188,6 +189,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error { func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { ww.stack.mutex.Lock() + const op = errors.Op("remove worker") defer ww.stack.mutex.Unlock() pid := wb.Pid() for i := 0; i < len(ww.stack.workers); i++ { @@ -200,7 +202,7 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error wb.State().Set(StateInvalid) err := wb.Kill() if err != nil { - return err + return errors.E(op, err) } break } @@ -274,12 +276,13 @@ func (ww *workerWatcher) WorkersList() []WorkerBase { } func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { + const op = errors.Op("process wait") err := w.Wait(ctx) if err != nil { ww.events.Push(WorkerEvent{ Event: EventWorkerError, Worker: w, - Payload: err, + Payload: errors.E(op, err), }) } @@ -301,7 +304,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, - Payload: err, + Payload: errors.E(op, err), }) } @@ -316,7 +319,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, - Payload: err, + Payload: errors.E(op, err), }) return } |