summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xinternal/protocol.go30
-rwxr-xr-xpool/static_pool.go28
-rwxr-xr-xtransport/pipe/pipe_factory.go36
-rwxr-xr-xtransport/socket/socket_factory.go32
4 files changed, 46 insertions, 80 deletions
diff --git a/internal/protocol.go b/internal/protocol.go
index 73cb960e..e6e6e7ab 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -34,8 +34,6 @@ func putFrame(f *frame.Frame) {
}
func SendControl(rl relay.Relay, payload interface{}) error {
- const op = errors.Op("send_control")
-
fr := getFrame()
defer putFrame(fr)
@@ -45,7 +43,7 @@ func SendControl(rl relay.Relay, payload interface{}) error {
if data, ok := payload.([]byte); ok {
// check if payload no more that 4Gb
if uint32(len(data)) > ^uint32(0) {
- return errors.E(op, errors.Str("payload is more that 4gb"))
+ return errors.Str("payload is more that 4gb")
}
fr.WritePayloadLen(fr.Header(), uint32(len(data)))
@@ -54,14 +52,14 @@ func SendControl(rl relay.Relay, payload interface{}) error {
err := rl.Send(fr)
if err != nil {
- return errors.E(op, err)
+ return err
}
return nil
}
data, err := json.Marshal(payload)
if err != nil {
- return errors.E(op, errors.Errorf("invalid payload: %s", err))
+ return errors.Errorf("invalid payload: %s", err)
}
fr.WritePayloadLen(fr.Header(), uint32(len(data)))
@@ -71,47 +69,47 @@ func SendControl(rl relay.Relay, payload interface{}) error {
// we don't need a copy here, because frame copy the data before send
err = rl.Send(fr)
if err != nil {
- return errors.E(op, err)
+ return errors.E(errors.FileNotFound, err)
}
return nil
}
func Pid(rl relay.Relay) (int64, error) {
- const op = errors.Op("fetch_pid")
err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
}
fr := getFrame()
defer putFrame(fr)
err = rl.Receive(fr)
- if !fr.VerifyCRC(fr.Header()) {
- return 0, errors.E(op, errors.Str("CRC mismatch"))
- }
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
+ }
+
+ if !fr.VerifyCRC(fr.Header()) {
+ return 0, errors.Str("CRC mismatch")
}
if fr == nil {
- return 0, errors.E(op, errors.Str("nil frame received"))
+ return 0, errors.Str("nil frame received")
}
flags := fr.ReadFlags()
if flags&frame.CONTROL == 0 {
- return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
+ return 0, errors.Str("unexpected response, header is missing, no CONTROL flag")
}
link := &pidCommand{}
err = json.Unmarshal(fr.Payload(), link)
if err != nil {
- return 0, errors.E(op, err)
+ return 0, err
}
if link.Pid <= 0 {
- return 0, errors.E(op, errors.Str("pid should be greater than 0"))
+ return 0, errors.Str("pid should be greater than 0")
}
return int64(link.Pid), nil
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 25097395..6c2df4e3 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -52,9 +52,8 @@ type StaticPool struct {
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
- const op = errors.Op("static_pool_initialize")
if factory == nil {
- return nil, errors.E(op, errors.Str("no factory initialized"))
+ return nil, errors.Str("no factory initialized")
}
cfg.InitDefaults()
@@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
p.errEncoder = defaultErrEncoder(p)
@@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(worker.StateInvalid)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
return nil, err
@@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- return nil, errors.E(op, err)
+ return nil, err
}
}
}
@@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// execDebug used when debug mode was not set and exec_ttl is 0
func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
return nil, err
@@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// destroy the worker
@@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- return nil, errors.E(op, err)
+ return nil, err
}
return r, nil
@@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
- return nil, errors.E(op, errors.WorkerAllocate, err)
+ return nil, err
}
workers = append(workers, w)
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 84a9d311..3ea8fd98 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -4,12 +4,10 @@ import (
"context"
"os/exec"
- "github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
- "go.uber.org/multierr"
)
// Factory connects to stack using standard
@@ -31,14 +29,13 @@ type sr struct {
// method Wait() must be handled on level above.
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
spCh := make(chan sr)
- const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -51,7 +48,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -64,7 +61,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -82,7 +79,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -93,15 +90,11 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
// used as a ping
_, err = internal.Pid(relay)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
+ _ = w.Kill()
select {
case spCh <- sr{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -138,20 +131,19 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
in, err := cmd.StdoutPipe()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
out, err := cmd.StdinPipe()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// Init new PIPE relay
@@ -161,18 +153,14 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
// Start the worker
err = w.Start()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// errors bundle
_, err = internal.Pid(relay)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
+ _ = w.Kill()
+ return nil, err
}
// everything ok, set ready state
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index 39c04eac..dfffdf4e 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -16,7 +16,6 @@ import (
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
- "go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
@@ -85,7 +84,6 @@ type socketSpawn struct {
// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
@@ -95,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -108,7 +106,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
select {
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
default:
@@ -118,17 +116,12 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
rl, err := f.findRelayWithContext(ctxT, w)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
-
+ _ = w.Kill()
select {
// try to write result
case c <- socketSpawn{
w: nil,
- err: errors.E(op, err),
+ err: err,
}:
return
// if no receivers - return
@@ -165,7 +158,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
return nil, err
@@ -173,16 +165,12 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
err = w.Start()
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
rl, err := f.findRelay(w)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
+ _ = w.Kill()
return nil, err
}
@@ -191,12 +179,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
// errors bundle
_, err = internal.Pid(rl)
if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
+ _ = w.Kill()
+ return nil, err
}
w.State().Set(worker.StateReady)