summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-09 15:11:10 +0300
committerGitHub <[email protected]>2020-11-09 15:11:10 +0300
commit0874bcb2f6b284a940ba4f3507eb8c4619c27868 (patch)
treec99d15624cd080cad22b7c8fb7d4714b2dc124fb
parent9fbe7726dd55cfedda724b7644e1b6bf7c1a6cb4 (diff)
parentf218dcbd7e55d9ad1df8336e2331cdaa62d9ded3 (diff)
Merge pull request #390 from spiral/feature/switch_to_spiral_errorsv2.0.0-alpha17
Feature/switch to spiral errors
-rwxr-xr-x.github/workflows/ci-build.yml26
-rwxr-xr-xerrors.go24
-rwxr-xr-xerrors_test.go18
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum4
-rwxr-xr-xpipe_factory.go44
-rw-r--r--plugins/app/plugin.go15
-rwxr-xr-xprocess_state.go4
-rwxr-xr-xprocess_state_test.go1
-rwxr-xr-xsocket_factory_test.go2
-rwxr-xr-xstatic_pool.go34
-rwxr-xr-xstatic_pool_test.go86
-rwxr-xr-xsupervisor_pool.go16
-rwxr-xr-xsync_worker.go10
-rwxr-xr-xsync_worker_test.go7
-rwxr-xr-xutil/isolate.go11
-rwxr-xr-xworker_watcher.go15
17 files changed, 140 insertions, 179 deletions
diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml
index e3e09d88..99eb8834 100755
--- a/.github/workflows/ci-build.yml
+++ b/.github/workflows/ci-build.yml
@@ -57,8 +57,8 @@ jobs:
- name: Install Composer dependencies
run: composer install --prefer-dist --no-interaction --no-suggest # --prefer-source
-# - name: Analyze PHP sources
-# run: composer analyze
+ # - name: Analyze PHP sources
+ # run: composer analyze
- name: Install Go dependencies
run: go mod download
@@ -73,19 +73,15 @@ jobs:
go test -v -race ./plugins/app/tests -tags=debug -coverprofile=app.txt -covermode=atomic
- name: Run code coverage
- env:
- CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
- run: |
- if [[ "$CODECOV_TOKEN" != "" ]]; then
- curl https://codecov.io/bash -o codecov-bash
- chmod +x codecov-bash
- ./codecov-bash -f lib.txt
- ./codecov-bash -f rpc_config.txt
- ./codecov-bash -f rpc.txt
- ./codecov-bash -f plugin_config.txt
- ./codecov-bash -f logger.txt
- ./codecov-bash -f app.txt
- fi
+ uses: codecov/codecov-action@v1
+ with:
+ token: ${{ secrets.CODECOV_TOKEN }}
+ files: lib.txt, rpc_config.txt, rpc.txt, plugin_config.txt, logger.txt, app.txt
+ flags: unittests
+ name: codecov-umbrella
+ fail_ci_if_error: false
+ verbose: true
+
golangci-check:
name: runner / golangci-lint
diff --git a/errors.go b/errors.go
deleted file mode 100755
index 7c91a92b..00000000
--- a/errors.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package roadrunner
-
-// ExecError is job level error (no WorkerProcess halt), wraps at top
-// of error context
-type ExecError []byte
-
-// Error converts error context to string
-func (te ExecError) Error() string {
- return string(te)
-}
-
-// WorkerError is WorkerProcess related error
-type WorkerError struct {
- // Worker
- Worker WorkerBase
-
- // Caused error
- Caused error
-}
-
-// Error converts error context to string
-func (e WorkerError) Error() string {
- return e.Caused.Error()
-}
diff --git a/errors_test.go b/errors_test.go
deleted file mode 100755
index 86ab908d..00000000
--- a/errors_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package roadrunner
-
-import (
- "errors"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_JobError_Error(t *testing.T) {
- e := ExecError([]byte("error"))
- assert.Equal(t, "error", e.Error())
-}
-
-func Test_WorkerError_Error(t *testing.T) {
- e := WorkerError{Worker: nil, Caused: errors.New("error")}
- assert.Equal(t, "error", e.Error())
-}
diff --git a/go.mod b/go.mod
index cb7a7a1f..1162b589 100755
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
github.com/shirou/gopsutil v3.20.10+incompatible
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta15
- github.com/spiral/errors v1.0.2
+ github.com/spiral/errors v1.0.4
github.com/spiral/goridge/v2 v2.4.6
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
diff --git a/go.sum b/go.sum
index 8433a2c9..aa2ccfd3 100755
--- a/go.sum
+++ b/go.sum
@@ -193,8 +193,8 @@ github.com/spiral/endure v1.0.0-beta15 h1:pBFn9LKQLPSzrG7kGE30T0VEjp2A6yT6p2BRRU
github.com/spiral/endure v1.0.0-beta15/go.mod h1:iGh1Zf1cckkJa5J9Obm8d/96kKYvlJBv/D0iHOuYWLQ=
github.com/spiral/errors v1.0.1 h1:OyKLwQH+42hhaRYuXGzfPKCFOmawA/PYXTY9wsK99n4=
github.com/spiral/errors v1.0.1/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/errors v1.0.2 h1:i/XMmA2VJt9sD64N4/zgQ9Y0cwlNQRLDaxOZPZV09D4=
-github.com/spiral/errors v1.0.2/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0=
+github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pipe_factory.go b/pipe_factory.go
index d6242775..15f38e42 100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -2,9 +2,7 @@ package roadrunner
import (
"context"
- "fmt"
"os/exec"
- "strings"
"github.com/spiral/errors"
"github.com/spiral/goridge/v2"
@@ -33,12 +31,13 @@ type SpawnResult struct {
// method Wait() must be handled on level above.
func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) {
c := make(chan SpawnResult)
+ const op = errors.Op("spawn worker with context")
go func() {
w, err := InitBaseWorker(cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -48,7 +47,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -58,7 +57,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -72,7 +71,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
if err != nil {
c <- SpawnResult{
w: nil,
- err: errors.E(err, "process error"),
+ err: errors.E(op, err, "process error"),
}
return
}
@@ -87,7 +86,7 @@ func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd)
)
c <- SpawnResult{
w: nil,
- err: err,
+ err: errors.E(op, err),
}
return
}
@@ -117,19 +116,19 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
const op = errors.Op("spawn worker")
w, err := InitBaseWorker(cmd)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// TODO why out is in?
in, err := cmd.StdoutPipe()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// TODO why in is out?
out, err := cmd.StdinPipe()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
// Init new PIPE relay
@@ -143,24 +142,13 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
// errors bundle
- var errs []string
- if pid, errF := fetchPID(relay); pid != w.Pid() {
- if errF != nil {
- errs = append(errs, errF.Error())
- }
-
- errK := w.Kill()
- if errK != nil {
- errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error())
- }
-
- if wErr := w.Wait(context.Background()); wErr != nil {
- errs = append(errs, wErr.Error())
- }
-
- if len(errs) > 0 {
- return nil, errors.E(op, strings.Join(errs, "/"))
- }
+ if pid, err := fetchPID(relay); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(context.Background()),
+ )
+ return nil, errors.E(op, err)
}
// everything ok, set ready state
diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go
index 839685bd..d76961ca 100644
--- a/plugins/app/plugin.go
+++ b/plugins/app/plugin.go
@@ -34,9 +34,10 @@ type Plugin struct {
// Init application provider.
func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("Init")
err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
- return err
+ return errors.E(op, errors.Init, err)
}
app.cfg.InitDefaults()
app.log = log
@@ -97,14 +98,15 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// NewWorker issues new standalone worker.
func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ const op = errors.Op("new worker")
spawnCmd, err := app.CmdFactory(env)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
w.AddListener(app.collectLogs)
@@ -131,18 +133,19 @@ func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env
// creates relay and worker factory.
func (app *Plugin) initFactory() (roadrunner.Factory, error) {
+ const op = errors.Op("network factory init")
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
if err != nil {
- return nil, err
+ return nil, errors.E(op, errors.Network, err)
}
switch dsn[0] {
@@ -152,7 +155,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/process_state.go b/process_state.go
index 1a4c4d65..1291a904 100755
--- a/process_state.go
+++ b/process_state.go
@@ -2,6 +2,7 @@ package roadrunner
import (
"github.com/shirou/gopsutil/process"
+ "github.com/spiral/errors"
)
// ProcessState provides information about specific worker.
@@ -25,10 +26,11 @@ type ProcessState struct {
// WorkerProcessState creates new worker state definition.
func WorkerProcessState(w WorkerBase) (ProcessState, error) {
+ const op = errors.Op("worker_process state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return ProcessState{}, err
+ return ProcessState{}, errors.E(op, err)
}
return ProcessState{
diff --git a/process_state_test.go b/process_state_test.go
deleted file mode 100755
index 3f283dce..00000000
--- a/process_state_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package roadrunner
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 6ab87872..f7b2e69a 100755
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -116,7 +116,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
assert.Contains(t, err2.Error(), "failboot")
diff --git a/static_pool.go b/static_pool.go
index 66dac7c3..f64a2c9a 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -118,9 +118,9 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
rsp, err := sw.Exec(p)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(ExecError); jobError {
+ if errors.Is(errors.Exec, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
@@ -188,10 +188,8 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
const op = errors.Op("Exec")
w, err := sp.ww.GetFreeWorker(context.Background())
- if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
+ if err != nil {
return EmptyPayload, errors.E(op, err)
- } else if err != nil {
- return EmptyPayload, err
}
sw := w.(SyncWorker)
@@ -199,23 +197,23 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
rsp, err := sw.ExecWithContext(ctx, rqs)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(ExecError); jobError {
+ if errors.Is(errors.Exec, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
sp.ww.PushWorker(w)
}
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
sw.State().Set(StateInvalid)
@@ -223,10 +221,10 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
errS := w.Stop(bCtx)
if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ return EmptyPayload, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
// worker want's to be terminated
@@ -234,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
@@ -243,7 +241,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew(bCtx)
if err != nil {
- return EmptyPayload, err
+ return EmptyPayload, errors.E(op, err)
}
} else {
sp.ww.PushWorker(w)
@@ -258,6 +256,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+ const op = errors.Op("allocate workers")
var workers []WorkerBase
// constant number of stack simplify logic
@@ -266,20 +265,21 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
- return nil, err
+ return nil, errors.E(op, err)
}
- cancel()
workers = append(workers, w)
+ cancel()
}
return workers, nil
}
func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
+ const op = errors.Op("check max jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err := sp.ww.AllocateNew(ctx)
if err != nil {
sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
- return err
+ return errors.E(op, err)
}
}
return nil
diff --git a/static_pool_test.go b/static_pool_test.go
index f1e3e4e4..8f8a6f56 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/spiral/errors"
"github.com/stretchr/testify/assert"
)
@@ -152,52 +153,49 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, ExecError{}, err)
- assert.Equal(t, "hello", err.Error())
+ if errors.Is(errors.Exec, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "exec payload: Exec: hello")
}
-// TODO temporary commented, figure out later
-// func Test_StaticPool_Broken_Replace(t *testing.T) {
-// ctx := context.Background()
-// p, err := NewPool(
-// ctx,
-// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
-// NewPipeFactory(),
-// cfg,
-// )
-// assert.NoError(t, err)
-// assert.NotNil(t, p)
-//
-// wg := &sync.WaitGroup{}
-// wg.Add(1)
-// var i int64
-// atomic.StoreInt64(&i, 10)
-//
-// p.AddListener(func(event interface{}) {
-//
-// })
-//
-// go func() {
-// for {
-// select {
-// case ev := <-p.Events():
-// wev := ev.Payload.(WorkerEvent)
-// if _, ok := wev.Payload.([]byte); ok {
-// assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()")
-// wg.Done()
-// return
-// }
-// }
-// }
-// }()
-// res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
-// assert.Error(t, err)
-// assert.Nil(t, res.Context)
-// assert.Nil(t, res.Body)
-// wg.Wait()
-//
-// p.Destroy(ctx)
-// }
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ workers := p.Workers()
+ for i := 0; i < len(workers); i++ {
+ workers[i].AddListener(func(event interface{}) {
+ if wev, ok := event.(WorkerEvent); ok {
+ if wev.Event == EventWorkerLog {
+ assert.Contains(t, string(wev.Payload.([]byte)), "undefined_function()")
+ wg.Done()
+ return
+ }
+ }
+ })
+ }
+
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ wg.Wait()
+
+ p.Destroy(ctx)
+}
//
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 92d03e77..e23abdd1 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -54,7 +54,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
res, err := sp.pool.ExecWithContext(ctx, rqs)
if err != nil {
c <- ttlExec{
- err: err,
+ err: errors.E(op, err),
p: EmptyPayload,
}
}
@@ -80,7 +80,12 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
}
func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
- return sp.pool.Exec(p)
+ const op = errors.Op("supervised exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ return rsp, nil
}
func (sp *supervisedPool) AddListener(listener util.EventListener) {
@@ -130,6 +135,7 @@ func (sp *supervisedPool) Stop() {
func (sp *supervisedPool) control() {
now := time.Now()
ctx := context.TODO()
+ const op = errors.Op("supervised pool control tick")
// THIS IS A COPY OF WORKERS
workers := sp.pool.Workers()
@@ -148,7 +154,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
@@ -160,7 +166,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
@@ -193,7 +199,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
diff --git a/sync_worker.go b/sync_worker.go
index 282254e5..a9c53553 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -50,7 +50,8 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(ExecError); !ok {
+ // just to be more verbose
+ if errors.Is(errors.Exec, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -95,7 +96,8 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(ExecError); !ok {
+ // just to be more verbose
+ if errors.Is(errors.Exec, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -131,7 +133,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
}
func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
- const op = errors.Op("exec_payload")
+ const op = errors.Op("exec payload")
// two things; todo: merge
if err := sendControl(tw.w.Relay(), p.Context); err != nil {
return EmptyPayload, errors.E(op, err, "header error")
@@ -154,7 +156,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, ExecError(rsp.Context)
+ return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context)))
}
// add streaming support :)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index 1bc2deb1..add0a066 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -6,6 +6,7 @@ import (
"sync"
"testing"
+ "github.com/spiral/errors"
"github.com/stretchr/testify/assert"
)
@@ -206,8 +207,10 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, ExecError{}, err)
- assert.Equal(t, "hello", err.Error())
+ if errors.Is(errors.Exec, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+ assert.Contains(t, err.Error(), "exec payload: Exec: hello")
}
func Test_NumExecs(t *testing.T) {
diff --git a/util/isolate.go b/util/isolate.go
index 005c430e..2bdb9d6c 100755
--- a/util/isolate.go
+++ b/util/isolate.go
@@ -9,6 +9,8 @@ import (
"os/user"
"strconv"
"syscall"
+
+ "github.com/spiral/errors"
)
// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
@@ -18,19 +20,20 @@ func IsolateProcess(cmd *exec.Cmd) {
// ExecuteFromUser may work only if run RR under root user
func ExecuteFromUser(cmd *exec.Cmd, u string) error {
+ const op = errors.Op("execute from user")
usr, err := user.Lookup(u)
if err != nil {
- return err
+ return errors.E(op, err)
}
usrI32, err := strconv.Atoi(usr.Uid)
if err != nil {
- return err
+ return errors.E(op, err)
}
grI32, err := strconv.Atoi(usr.Gid)
if err != nil {
- return err
+ return errors.E(op, err)
}
// For more information:
@@ -44,7 +47,7 @@ func ExecuteFromUser(cmd *exec.Cmd, u string) error {
return fmt.Errorf("unable to test user namespaces due to permissions")
}
- return fmt.Errorf("failed to stat /proc/self/ns/user: %v", err)
+ return errors.E(op, errors.Errorf("failed to stat /proc/self/ns/user: %v", err))
}
cmd.SysProcAttr.Credential = &syscall.Credential{
diff --git a/worker_watcher.go b/worker_watcher.go
index 36b3e029..3a89554d 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -158,7 +158,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
ww.ReduceWorkersCount()
return w, nil
case <-tout.C:
- return nil, errors.Str("no free stack")
+ return nil, errors.E(op, errors.Str("no free workers in the stack"))
}
}
}
@@ -169,9 +169,10 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
ww.stack.mutex.Lock()
+ const op = errors.Op("allocate new worker")
sw, err := ww.allocator()
if err != nil {
- return err
+ return errors.E(op, err)
}
ww.addToWatch(sw)
@@ -188,6 +189,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
ww.stack.mutex.Lock()
+ const op = errors.Op("remove worker")
defer ww.stack.mutex.Unlock()
pid := wb.Pid()
for i := 0; i < len(ww.stack.workers); i++ {
@@ -200,7 +202,7 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
wb.State().Set(StateInvalid)
err := wb.Kill()
if err != nil {
- return err
+ return errors.E(op, err)
}
break
}
@@ -274,12 +276,13 @@ func (ww *workerWatcher) WorkersList() []WorkerBase {
}
func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
+ const op = errors.Op("process wait")
err := w.Wait(ctx)
if err != nil {
ww.events.Push(WorkerEvent{
Event: EventWorkerError,
Worker: w,
- Payload: err,
+ Payload: errors.E(op, err),
})
}
@@ -301,7 +304,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
- Payload: err,
+ Payload: errors.E(op, err),
})
}
@@ -316,7 +319,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
- Payload: err,
+ Payload: errors.E(op, err),
})
return
}