diff options
author | Valery Piashchynski <[email protected]> | 2020-12-20 18:28:08 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-12-20 18:28:08 +0300 |
commit | a10d20d20e910ed8fcfbc3bc690aaf17ee338ff3 (patch) | |
tree | 6f017d59c92a59308c00aa3c338ea32b96f2d13d /pkg | |
parent | 0e02ec510acca9cd8691071850ba95a6cb91d78d (diff) | |
parent | 35ee3b921be6e4416ff26a610f2f95044ff80cbd (diff) |
Merge pull request #456 from spiral/reload/tests_stabilization
Reload/tests stabilization
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 6 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 15 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 38 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 8 |
5 files changed, 35 insertions, 34 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index c86d78c4..34735fe6 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -5,7 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -65,7 +65,7 @@ func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (wo } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker @@ -134,7 +134,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 9453de1d..40797747 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -469,7 +469,7 @@ func Test_Error(t *testing.T) { if errors.Is(errors.ErrSoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") } - assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") + assert.Contains(t, err.Error(), "hello") } func Test_NumExecs(t *testing.T) { diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index f721ad66..b08d24e4 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -9,11 +9,12 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -65,7 +66,7 @@ func (f *Factory) listen() error { return err } - rl := goridge.NewSocketRelay(conn) + rl := socket.NewSocketRelay(conn) pid, err := internal.FetchPID(rl) if err != nil { return err @@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error { } // waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } -func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } // chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index daa07186..eacb8a8a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,13 +6,13 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" - - "github.com/spiral/goridge/v3" ) type syncWorker struct { @@ -126,10 +126,10 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (p } func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec payload") + const op = errors.Op("exec pl") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) // can be 0 here buf := new(bytes.Buffer) @@ -137,28 +137,28 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { buf.Write(p.Body) // Context offset - frame.WriteOptions(uint32(len(p.Context))) - frame.WritePayloadLen(uint32(buf.Len())) - frame.WritePayload(buf.Bytes()) + fr.WriteOptions(uint32(len(p.Context))) + fr.WritePayloadLen(uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) - frame.WriteCRC() + fr.WriteCRC() // empty and free the buffer buf.Truncate(0) - err := tw.Relay().Send(frame) + err := tw.Relay().Send(fr) if err != nil { return payload.Payload{}, err } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { @@ -167,7 +167,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { flags := frameR.ReadFlags() - if flags&byte(goridge.ERROR) != byte(0) { + if flags&byte(frame.ERROR) != byte(0) { return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } @@ -176,11 +176,11 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := payload.Payload{} - payload.Context = frameR.Payload()[:options[0]] - payload.Body = frameR.Payload()[options[0]:] + pl := payload.Payload{} + pl.Context = frameR.Payload()[:options[0]] + pl.Body = frameR.Payload()[options[0]:] - return payload, nil + return pl, nil } func (tw *syncWorker) String() string { @@ -219,10 +219,10 @@ func (tw *syncWorker) Kill() error { return tw.w.Kill() } -func (tw *syncWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() relay.Relay { return tw.w.Relay() } -func (tw *syncWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl relay.Relay) { tw.w.AttachRelay(rl) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 95fa6e06..ae59d611 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -74,7 +74,7 @@ type Process struct { mu sync.RWMutex // communication bus with underlying process. - relay goridge.Relay + relay relay.Relay // rd in a second part of pipe to read from stderr rd io.Reader // stop signal terminates io.Pipe from reading from stderr @@ -131,13 +131,13 @@ func (w *Process) State() internal.State { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) AttachRelay(rl goridge.Relay) { +func (w *Process) AttachRelay(rl relay.Relay) { w.relay = rl } // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) Relay() goridge.Relay { +func (w *Process) Relay() relay.Relay { return w.relay } |