summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xerrors.go24
-rwxr-xr-xerrors_test.go18
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum4
-rwxr-xr-xpipe_factory.go17
-rw-r--r--plugins/app/plugin.go15
-rwxr-xr-xsocket_factory_test.go2
-rwxr-xr-xstatic_pool.go9
-rwxr-xr-xstatic_pool_test.go84
-rwxr-xr-xsync_worker.go8
-rwxr-xr-xsync_worker_test.go7
-rwxr-xr-xutil/isolate.go11
12 files changed, 84 insertions, 117 deletions
diff --git a/errors.go b/errors.go
deleted file mode 100755
index 7c91a92b..00000000
--- a/errors.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package roadrunner
-
-// ExecError is job level error (no WorkerProcess halt), wraps at top
-// of error context
-type ExecError []byte
-
-// Error converts error context to string
-func (te ExecError) Error() string {
- return string(te)
-}
-
-// WorkerError is WorkerProcess related error
-type WorkerError struct {
- // Worker
- Worker WorkerBase
-
- // Caused error
- Caused error
-}
-
-// Error converts error context to string
-func (e WorkerError) Error() string {
- return e.Caused.Error()
-}
diff --git a/errors_test.go b/errors_test.go
deleted file mode 100755
index 86ab908d..00000000
--- a/errors_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package roadrunner
-
-import (
- "errors"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_JobError_Error(t *testing.T) {
- e := ExecError([]byte("error"))
- assert.Equal(t, "error", e.Error())
-}
-
-func Test_WorkerError_Error(t *testing.T) {
- e := WorkerError{Worker: nil, Caused: errors.New("error")}
- assert.Equal(t, "error", e.Error())
-}
diff --git a/go.mod b/go.mod
index cb7a7a1f..1162b589 100755
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
github.com/shirou/gopsutil v3.20.10+incompatible
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta15
- github.com/spiral/errors v1.0.2
+ github.com/spiral/errors v1.0.4
github.com/spiral/goridge/v2 v2.4.6
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
diff --git a/go.sum b/go.sum
index 8433a2c9..aa2ccfd3 100755
--- a/go.sum
+++ b/go.sum
@@ -193,8 +193,8 @@ github.com/spiral/endure v1.0.0-beta15 h1:pBFn9LKQLPSzrG7kGE30T0VEjp2A6yT6p2BRRU
github.com/spiral/endure v1.0.0-beta15/go.mod h1:iGh1Zf1cckkJa5J9Obm8d/96kKYvlJBv/D0iHOuYWLQ=
github.com/spiral/errors v1.0.1 h1:OyKLwQH+42hhaRYuXGzfPKCFOmawA/PYXTY9wsK99n4=
github.com/spiral/errors v1.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/errors v1.0.2 h1:i/XMmA2VJt9sD64N4/zgQ9Y0cwlNQRLDaxOZPZV09D4=
-github.com/spiral/errors v1.0.2/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0=
+github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pipe_factory.go b/pipe_factory.go
index d6242775..9f85bf05 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -33,12 +33,13 @@ type SpawnResult struct {
// method Wait() must be handled on level above.
func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) {
c := make(chan SpawnResult)
+ const op = errors.Op("spawn worker with context")
go func() {
w, err := InitBaseWorker(cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -48,7 +49,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -58,7 +59,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -72,7 +73,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: errors.E(err, "process error"),
+ err: errors.E(op, err, "process error"),
}
return
}
@@ -87,7 +88,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
)
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -117,19 +118,19 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
const op = errors.Op("spawn worker")
w, err := InitBaseWorker(cmd)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// TODO why out is in?
in, err := cmd.StdoutPipe()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// TODO why in is out?
out, err := cmd.StdinPipe()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// Init new PIPE relay
diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go
index 839685bd..d76961ca 100644
--- a/plugins/app/plugin.go
+++ b/plugins/app/plugin.go
@@ -34,9 +34,10 @@ type Plugin struct {
// Init application provider.
func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("Init")
err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
- return err
+ return errors.E(op, errors.Init, err)
}
app.cfg.InitDefaults()
app.log = log
@@ -97,14 +98,15 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// NewWorker issues new standalone worker.
func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ const op = errors.Op("new worker")
spawnCmd, err := app.CmdFactory(env)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
w.AddListener(app.collectLogs)
@@ -131,18 +133,19 @@ func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env
// creates relay and worker factory.
func (app *Plugin) initFactory() (roadrunner.Factory, error) {
+ const op = errors.Op("network factory init")
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
if err != nil {
- return nil, err
+ return nil, errors.E(op, errors.Network, err)
}
switch dsn[0] {
@@ -152,7 +155,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 6ab87872..b664662c 100755
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second * 5).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
assert.Contains(t, err2.Error(), "failboot")
diff --git a/static_pool.go b/static_pool.go
index 66dac7c3..ee81fd39 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -118,9 +118,9 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
rsp, err := sw.Exec(p)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(ExecError); jobError {
+ if errors.Is(errors.Exec, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
@@ -135,6 +135,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
return EmptyPayload, err
+
}
sw.State().Set(StateInvalid)
@@ -199,9 +200,9 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
rsp, err := sw.ExecWithContext(ctx, rqs)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(ExecError); jobError {
+ if errors.Is(errors.Exec, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
diff --git a/static_pool_test.go b/static_pool_test.go
index f1e3e4e4..309449ab 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/spiral/errors"
"github.com/stretchr/testify/assert"
)
@@ -152,52 +153,47 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, ExecError{}, err)
- assert.Equal(t, "hello", err.Error())
+ if errors.Is(errors.Exec, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "exec_payload: Exec: hello")
}
-// TODO temporary commented, figure out later
-// func Test_StaticPool_Broken_Replace(t *testing.T) {
-// ctx := context.Background()
-// p, err := NewPool(
-// ctx,
-// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
-// NewPipeFactory(),
-// cfg,
-// )
-// assert.NoError(t, err)
-// assert.NotNil(t, p)
-//
-// wg := &sync.WaitGroup{}
-// wg.Add(1)
-// var i int64
-// atomic.StoreInt64(&i, 10)
-//
-// p.AddListener(func(event interface{}) {
-//
-// })
-//
-// go func() {
-// for {
-// select {
-// case ev := <-p.Events():
-// wev := ev.Payload.(WorkerEvent)
-// if _, ok := wev.Payload.([]byte); ok {
-// assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()")
-// wg.Done()
-// return
-// }
-// }
-// }
-// }()
-// res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
-// assert.Error(t, err)
-// assert.Nil(t, res.Context)
-// assert.Nil(t, res.Body)
-// wg.Wait()
-//
-// p.Destroy(ctx)
-// }
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ p.AddListener(func(event interface{}) {
+ if pev, ok := event.(PoolEvent); ok {
+ sw := pev.Payload.(SyncWorker)
+ sw.AddListener(func(event interface{}) {
+ if wev, ok := event.(WorkerEvent); ok {
+ assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()")
+ wg.Done()
+ return
+ }
+ })
+ }
+ })
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+ wg.Wait()
+
+ p.Destroy(ctx)
+}
//
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
diff --git a/sync_worker.go b/sync_worker.go
index 282254e5..56953fe6 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -50,7 +50,8 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(ExecError); !ok {
+ // just to be more verbose
+ if errors.Is(errors.Exec, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -95,7 +96,8 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(ExecError); !ok {
+ // just to be more verbose
+ if errors.Is(errors.Exec, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -154,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, ExecError(rsp.Context)
+ return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) //ExecError(rsp.Context)
}
// add streaming support :)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index 1bc2deb1..f93b1356 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -6,6 +6,7 @@ import (
"sync"
"testing"
+ "github.com/spiral/errors"
"github.com/stretchr/testify/assert"
)
@@ -206,8 +207,10 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, ExecError{}, err)
- assert.Equal(t, "hello", err.Error())
+ if errors.Is(errors.Exec, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+ assert.Contains(t, err.Error(), "exec_payload: Exec: hello")
}
func Test_NumExecs(t *testing.T) {
diff --git a/util/isolate.go b/util/isolate.go
index 005c430e..2bdb9d6c 100755
--- a/util/isolate.go
+++ b/util/isolate.go
@@ -9,6 +9,8 @@ import (
"os/user"
"strconv"
"syscall"
+
+ "github.com/spiral/errors"
)
// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
@@ -18,19 +20,20 @@ func IsolateProcess(cmd *exec.Cmd) {
// ExecuteFromUser may work only if run RR under root user
func ExecuteFromUser(cmd *exec.Cmd, u string) error {
+ const op = errors.Op("execute from user")
usr, err := user.Lookup(u)
if err != nil {
- return err
+ return errors.E(op, err)
}
usrI32, err := strconv.Atoi(usr.Uid)
if err != nil {
- return err
+ return errors.E(op, err)
}
grI32, err := strconv.Atoi(usr.Gid)
if err != nil {
- return err
+ return errors.E(op, err)
}
// For more information:
@@ -44,7 +47,7 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error {
return fmt.Errorf("unable to test user namespaces due to permissions")
}
- return fmt.Errorf("failed to stat /proc/self/ns/user: %v", err)
+ return errors.E(op, errors.Errorf("failed to stat /proc/self/ns/user: %v", err))
}
cmd.SysProcAttr.Credential = &syscall.Credential{