summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-18 16:19:28 +0300
committerGitHub <[email protected]>2021-10-18 16:19:28 +0300
commitc1c4136e86bf71837c7995ff75d859b4b68f0616 (patch)
tree086e2e8338e47dd0efae3704812db65e67104e38
parentd5474764f095fc2d829654272d5b5bf3662d0241 (diff)
parent6a0394a011e8b799d036a80e72d172162122c9f5 (diff)
[#830]: refactoring(logger): update error stacktracev2.5.0-rc.2
[#830]: refactoring(logger): update error stacktrace
-rw-r--r--.github/workflows/linux.yml2
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rwxr-xr-xinternal/protocol.go30
-rwxr-xr-xpool/static_pool.go28
-rwxr-xr-xtransport/pipe/pipe_factory.go36
-rwxr-xr-xtransport/socket/socket_factory.go32
7 files changed, 50 insertions, 84 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index be74d606..9d2610ea 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -23,7 +23,7 @@ jobs:
fail-fast: true
matrix:
php: ["7.4", "8.0", "8.1"]
- go: ["1.17.1"]
+ go: ["1.17.2"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
diff --git a/go.mod b/go.mod
index 23905e0f..e39ab00a 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@ require (
github.com/shirou/gopsutil v3.21.9+incompatible
// spiral
github.com/spiral/errors v1.0.12
- github.com/spiral/goridge/v3 v3.2.2
+ github.com/spiral/goridge/v3 v3.2.3
// spiral
github.com/stretchr/testify v1.7.0
github.com/valyala/tcplisten v1.0.0
diff --git a/go.sum b/go.sum
index 6195a83c..c58616f8 100644
--- a/go.sum
+++ b/go.sum
@@ -77,8 +77,8 @@ github.com/shirou/gopsutil v3.21.9+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s=
github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA=
-github.com/spiral/goridge/v3 v3.2.2 h1:plrzrISm3Q3Z74p+tsi90XwU9n4859LWDXK0PtGNoFE=
-github.com/spiral/goridge/v3 v3.2.2/go.mod h1:DA4Ekw9qVcTvVouUNJgxESXURBHZ2SfkliCEIpEl9lA=
+github.com/spiral/goridge/v3 v3.2.3 h1:iNz6aD/c00hC50wo+qT8uP5ZZ3VdCAERXUtNiyDE3Yo=
+github.com/spiral/goridge/v3 v3.2.3/go.mod h1:DA4Ekw9qVcTvVouUNJgxESXURBHZ2SfkliCEIpEl9lA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
diff --git a/internal/protocol.go b/internal/protocol.go
index 73cb960e..e6e6e7ab 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -34,8 +34,6 @@ func putFrame(f *frame.Frame) {
}
func SendControl(rl relay.Relay, payload interface{}) error {
- const op = errors.Op("send_control")
-
fr := getFrame()
defer putFrame(fr)
@@ -45,7 +43,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
if data, ok := payload.([]byte); ok {
// check if payload no more that 4Gb
if uint32(len(data)) > ^uint32(0) {
- return errors.E(op, errors.Str("payload is more that 4gb"))
+ return errors.Str("payload is more that 4gb")
}
fr.WritePayloadLen(fr.Header(), uint32(len(data)))
@@ -54,14 +52,14 @@ func SendControl(rl relay.Relay, payload interface{}) error {
err := rl.Send(fr)
if err != nil {
- return errors.E(op, err)
+ return err
}
return nil
}
data, err := json.Marshal(payload)
if err != nil {
- return errors.E(op, errors.Errorf("invalid payload: %s", err))
+ return errors.Errorf("invalid payload: %s", err)
}
fr.WritePayloadLen(fr.Header(), uint32(len(data)))
@@ -71,47 +69,47 @@ func SendControl(rl relay.Relay, payload interface{}) error {
// we don't need a copy here, because frame copy the data before send
err = rl.Send(fr)
if err != nil {
- return errors.E(op, err)
+ return errors.E(errors.FileNotFound, err)
}
return nil
}
func Pid(rl relay.Relay) (int64, error) {
- const op = errors.Op("fetch_pid")
err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
}
fr := getFrame()
defer putFrame(fr)
err = rl.Receive(fr)
- if !fr.VerifyCRC(fr.Header()) {
- return 0, errors.E(op, errors.Str("CRC mismatch"))
- }
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
+ }
+
+ if !fr.VerifyCRC(fr.Header()) {
+ return 0, errors.Str("CRC mismatch")
}
if fr == nil {
- return 0, errors.E(op, errors.Str("nil frame received"))
+ return 0, errors.Str("nil frame received")
}
flags := fr.ReadFlags()
if flags&frame.CONTROL == 0 {
- return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
+ return 0, errors.Str("unexpected response, header is missing, no CONTROL flag")
}
link := &pidCommand{}
err = json.Unmarshal(fr.Payload(), link)
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
}
if link.Pid <= 0 {
- return 0, errors.E(op, errors.Str("pid should be greater than 0"))
+ return 0, errors.Str("pid should be greater than 0")
}
return int64(link.Pid), nil
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 25097395..f500f998 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -52,9 +52,8 @@ type StaticPool struct {
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
- const op = errors.Op("static_pool_initialize")
if factory == nil {
- return nil, errors.E(op, errors.Str("no factory initialized"))
+ return nil, errors.Str("no factory initialized")
}
cfg.InitDefaults()
@@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
p.errEncoder = defaultErrEncoder(p)
@@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(worker.StateInvalid)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
return nil, err
@@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- return nil, errors.E(op, err)
+ return nil, err
}
}
}
@@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// execDebug used when debug mode was not set and exec_ttl is 0
func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
return nil, err
@@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// destroy the worker
@@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- return nil, errors.E(op, err)
+ return nil, err
}
return r, nil
@@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
- return nil, errors.E(op, errors.WorkerAllocate, err)
+ return nil, errors.E(errors.WorkerAllocate, err)
}
workers = append(workers, w)
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 84a9d311..3ea8fd98 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -4,12 +4,10 @@ import (
"context"
"os/exec"
- "github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
- "go.uber.org/multierr"
)
// Factory connects to stack using standard
@@ -31,14 +29,13 @@ type sr struct {
// method Wait() must be handled on level above.
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
spCh := make(chan sr)
- const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -51,7 +48,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -64,7 +61,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -82,7 +79,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -93,15 +90,11 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
// used as a ping
_, err = internal.Pid(relay)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
+ _ = w.Kill()
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -138,20 +131,19 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
in, err := cmd.StdoutPipe()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
out, err := cmd.StdinPipe()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// Init new PIPE relay
@@ -161,18 +153,14 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
// Start the worker
err = w.Start()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// errors bundle
_, err = internal.Pid(relay)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
+ _ = w.Kill()
+ return nil, err
}
// everything ok, set ready state
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index 39c04eac..dfffdf4e 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -16,7 +16,6 @@ import (
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
- "go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
@@ -85,7 +84,6 @@ type socketSpawn struct {
// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
@@ -95,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -108,7 +106,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -118,17 +116,12 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
rl, err := f.findRelayWithContext(ctxT, w)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
-
+ _ = w.Kill()
select {
// try to write result
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
// if no receivers - return
@@ -165,7 +158,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, err
@@ -173,16 +165,12 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
err = w.Start()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
rl, err := f.findRelay(w)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
+ _ = w.Kill()
return nil, err
}
@@ -191,12 +179,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
// errors bundle
_, err = internal.Pid(rl)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
+ _ = w.Kill()
+ return nil, err
}
w.State().Set(worker.StateReady)