diff options
-rwxr-xr-x | errors.go | 24 | ||||
-rwxr-xr-x | errors_test.go | 18 | ||||
-rwxr-xr-x | go.mod | 2 | ||||
-rwxr-xr-x | go.sum | 4 | ||||
-rwxr-xr-x | pipe_factory.go | 17 | ||||
-rw-r--r-- | plugins/app/plugin.go | 15 | ||||
-rwxr-xr-x | socket_factory_test.go | 2 | ||||
-rwxr-xr-x | static_pool.go | 9 | ||||
-rwxr-xr-x | static_pool_test.go | 84 | ||||
-rwxr-xr-x | sync_worker.go | 8 | ||||
-rwxr-xr-x | sync_worker_test.go | 7 | ||||
-rwxr-xr-x | util/isolate.go | 11 |
12 files changed, 84 insertions, 117 deletions
diff --git a/errors.go b/errors.go deleted file mode 100755 index 7c91a92b..00000000 --- a/errors.go +++ /dev/null @@ -1,24 +0,0 @@ -package roadrunner - -// ExecError is job level error (no WorkerProcess halt), wraps at top -// of error context -type ExecError []byte - -// Error converts error context to string -func (te ExecError) Error() string { - return string(te) -} - -// WorkerError is WorkerProcess related error -type WorkerError struct { - // Worker - Worker WorkerBase - - // Caused error - Caused error -} - -// Error converts error context to string -func (e WorkerError) Error() string { - return e.Caused.Error() -} diff --git a/errors_test.go b/errors_test.go deleted file mode 100755 index 86ab908d..00000000 --- a/errors_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package roadrunner - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_JobError_Error(t *testing.T) { - e := ExecError([]byte("error")) - assert.Equal(t, "error", e.Error()) -} - -func Test_WorkerError_Error(t *testing.T) { - e := WorkerError{Worker: nil, Caused: errors.New("error")} - assert.Equal(t, "error", e.Error()) -} @@ -11,7 +11,7 @@ require ( github.com/shirou/gopsutil v3.20.10+incompatible github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta15 - github.com/spiral/errors v1.0.2 + github.com/spiral/errors v1.0.4 github.com/spiral/goridge/v2 v2.4.6 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -193,8 +193,8 @@ github.com/spiral/endure v1.0.0-beta15 h1:pBFn9LKQLPSzrG7kGE30T0VEjp2A6yT6p2BRRU github.com/spiral/endure v1.0.0-beta15/go.mod h1:iGh1Zf1cckkJa5J9Obm8d/96kKYvlJBv/D0iHOuYWLQ= github.com/spiral/errors v1.0.1 h1:OyKLwQH+42hhaRYuXGzfPKCFOmawA/PYXTY9wsK99n4= github.com/spiral/errors v1.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/errors v1.0.2 h1:i/XMmA2VJt9sD64N4/zgQ9Y0cwlNQRLDaxOZPZV09D4= -github.com/spiral/errors v1.0.2/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0= +github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k= github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pipe_factory.go b/pipe_factory.go index d6242775..9f85bf05 100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -33,12 +33,13 @@ type SpawnResult struct { // method Wait() must be handled on level above. func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { c := make(chan SpawnResult) + const op = errors.Op("spawn worker with context") go func() { w, err := InitBaseWorker(cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -48,7 +49,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -58,7 +59,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -72,7 +73,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) if err != nil { c <- SpawnResult{ w: nil, - err: errors.E(err, "process error"), + err: errors.E(op, err, "process error"), } return } @@ -87,7 +88,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) ) c <- SpawnResult{ w: nil, - err: err, + err: errors.E(op, err), } return } @@ -117,19 +118,19 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { const op = errors.Op("spawn worker") w, err := InitBaseWorker(cmd) if err != nil { - return nil, err + return nil, errors.E(op, err) } // TODO why out is in? in, err := cmd.StdoutPipe() if err != nil { - return nil, err + return nil, errors.E(op, err) } // TODO why in is out? out, err := cmd.StdinPipe() if err != nil { - return nil, err + return nil, errors.E(op, err) } // Init new PIPE relay diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go index 839685bd..d76961ca 100644 --- a/plugins/app/plugin.go +++ b/plugins/app/plugin.go @@ -34,9 +34,10 @@ type Plugin struct { // Init application provider. func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("Init") err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { - return err + return errors.E(op, errors.Init, err) } app.cfg.InitDefaults() app.log = log @@ -97,14 +98,15 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { // NewWorker issues new standalone worker. func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + const op = errors.Op("new worker") spawnCmd, err := app.CmdFactory(env) if err != nil { - return nil, err + return nil, errors.E(op, err) } w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) if err != nil { - return nil, err + return nil, errors.E(op, err) } w.AddListener(app.collectLogs) @@ -131,18 +133,19 @@ func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env // creates relay and worker factory. func (app *Plugin) initFactory() (roadrunner.Factory, error) { + const op = errors.Op("network factory init") if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) if err != nil { - return nil, err + return nil, errors.E(op, errors.Network, err) } switch dsn[0] { @@ -152,7 +155,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/socket_factory_test.go b/socket_factory_test.go index 6ab87872..b664662c 100755 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err2 := NewSocketServer(ls, time.Second * 5).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) assert.Contains(t, err2.Error(), "failboot") diff --git a/static_pool.go b/static_pool.go index 66dac7c3..ee81fd39 100755 --- a/static_pool.go +++ b/static_pool.go @@ -118,9 +118,9 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { rsp, err := sw.Exec(p) if err != nil { // soft job errors are allowed - if _, jobError := err.(ExecError); jobError { + if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew(bCtx) if err != nil { sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } @@ -135,6 +135,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } return EmptyPayload, err + } sw.State().Set(StateInvalid) @@ -199,9 +200,9 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { // soft job errors are allowed - if _, jobError := err.(ExecError); jobError { + if errors.Is(errors.Exec, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew(bCtx) if err != nil { sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } diff --git a/static_pool_test.go b/static_pool_test.go index f1e3e4e4..309449ab 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/spiral/errors" "github.com/stretchr/testify/assert" ) @@ -152,52 +153,47 @@ func Test_StaticPool_JobError(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, ExecError{}, err) - assert.Equal(t, "hello", err.Error()) + if errors.Is(errors.Exec, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "exec_payload: Exec: hello") } -// TODO temporary commented, figure out later -// func Test_StaticPool_Broken_Replace(t *testing.T) { -// ctx := context.Background() -// p, err := NewPool( -// ctx, -// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, -// NewPipeFactory(), -// cfg, -// ) -// assert.NoError(t, err) -// assert.NotNil(t, p) -// -// wg := &sync.WaitGroup{} -// wg.Add(1) -// var i int64 -// atomic.StoreInt64(&i, 10) -// -// p.AddListener(func(event interface{}) { -// -// }) -// -// go func() { -// for { -// select { -// case ev := <-p.Events(): -// wev := ev.Payload.(WorkerEvent) -// if _, ok := wev.Payload.([]byte); ok { -// assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()") -// wg.Done() -// return -// } -// } -// } -// }() -// res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) -// assert.Error(t, err) -// assert.Nil(t, res.Context) -// assert.Nil(t, res.Body) -// wg.Wait() -// -// p.Destroy(ctx) -// } +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, + NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + wg := &sync.WaitGroup{} + wg.Add(1) + + p.AddListener(func(event interface{}) { + if pev, ok := event.(PoolEvent); ok { + sw := pev.Payload.(SyncWorker) + sw.AddListener(func(event interface{}) { + if wev, ok := event.(WorkerEvent); ok { + assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()") + wg.Done() + return + } + }) + } + }) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() + + p.Destroy(ctx) +} // func Test_StaticPool_Broken_FromOutside(t *testing.T) { diff --git a/sync_worker.go b/sync_worker.go index 282254e5..56953fe6 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -50,7 +50,8 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(ExecError); !ok { + // just to be more verbose + if errors.Is(errors.Exec, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -95,7 +96,8 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(ExecError); !ok { + // just to be more verbose + if errors.Is(errors.Exec, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -154,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, ExecError(rsp.Context) + return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) //ExecError(rsp.Context) } // add streaming support :) diff --git a/sync_worker_test.go b/sync_worker_test.go index 1bc2deb1..f93b1356 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + "github.com/spiral/errors" "github.com/stretchr/testify/assert" ) @@ -206,8 +207,10 @@ func Test_Error(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, ExecError{}, err) - assert.Equal(t, "hello", err.Error()) + if errors.Is(errors.Exec, err) == false { + t.Fatal("error should be of type errors.Exec") + } + assert.Contains(t, err.Error(), "exec_payload: Exec: hello") } func Test_NumExecs(t *testing.T) { diff --git a/util/isolate.go b/util/isolate.go index 005c430e..2bdb9d6c 100755 --- a/util/isolate.go +++ b/util/isolate.go @@ -9,6 +9,8 @@ import ( "os/user" "strconv" "syscall" + + "github.com/spiral/errors" ) // IsolateProcess change gpid for the process to avoid bypassing signals to php processes. @@ -18,19 +20,20 @@ func IsolateProcess(cmd *exec.Cmd) { // ExecuteFromUser may work only if run RR under root user func ExecuteFromUser(cmd *exec.Cmd, u string) error { + const op = errors.Op("execute from user") usr, err := user.Lookup(u) if err != nil { - return err + return errors.E(op, err) } usrI32, err := strconv.Atoi(usr.Uid) if err != nil { - return err + return errors.E(op, err) } grI32, err := strconv.Atoi(usr.Gid) if err != nil { - return err + return errors.E(op, err) } // For more information: @@ -44,7 +47,7 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error { return fmt.Errorf("unable to test user namespaces due to permissions") } - return fmt.Errorf("failed to stat /proc/self/ns/user: %v", err) + return errors.E(op, errors.Errorf("failed to stat /proc/self/ns/user: %v", err)) } cmd.SysProcAttr.Credential = &syscall.Credential{ |