summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-20 17:42:56 +0300
committerValery Piashchynski <[email protected]>2020-12-20 17:42:56 +0300
commitfe0109a3b19ee82bd7bfbb57eae6b5b5166d7068 (patch)
tree52fd2960cb440c99163738fcfeeb6df465662155 /pkg/worker
parent349088db2081704e15699ce1e6f6b1f8898bd336 (diff)
Latest goridge version support
Diffstat (limited to 'pkg/worker')
-rwxr-xr-xpkg/worker/sync_worker.go38
-rwxr-xr-xpkg/worker/worker.go8
2 files changed, 23 insertions, 23 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index daa07186..eacb8a8a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,13 +6,13 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
-
- "github.com/spiral/goridge/v3"
)
type syncWorker struct {
@@ -126,10 +126,10 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (p
}
func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec payload")
+ const op = errors.Op("exec pl")
- frame := goridge.NewFrame()
- frame.WriteVersion(goridge.VERSION_1)
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
// can be 0 here
buf := new(bytes.Buffer)
@@ -137,28 +137,28 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
buf.Write(p.Body)
// Context offset
- frame.WriteOptions(uint32(len(p.Context)))
- frame.WritePayloadLen(uint32(buf.Len()))
- frame.WritePayload(buf.Bytes())
+ fr.WriteOptions(uint32(len(p.Context)))
+ fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
- frame.WriteCRC()
+ fr.WriteCRC()
// empty and free the buffer
buf.Truncate(0)
- err := tw.Relay().Send(frame)
+ err := tw.Relay().Send(fr)
if err != nil {
return payload.Payload{}, err
}
- frameR := goridge.NewFrame()
+ frameR := frame.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
@@ -167,7 +167,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
flags := frameR.ReadFlags()
- if flags&byte(goridge.ERROR) != byte(0) {
+ if flags&byte(frame.ERROR) != byte(0) {
return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
@@ -176,11 +176,11 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := payload.Payload{}
- payload.Context = frameR.Payload()[:options[0]]
- payload.Body = frameR.Payload()[options[0]:]
+ pl := payload.Payload{}
+ pl.Context = frameR.Payload()[:options[0]]
+ pl.Body = frameR.Payload()[options[0]:]
- return payload, nil
+ return pl, nil
}
func (tw *syncWorker) String() string {
@@ -219,10 +219,10 @@ func (tw *syncWorker) Kill() error {
return tw.w.Kill()
}
-func (tw *syncWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() relay.Relay {
return tw.w.Relay()
}
-func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl relay.Relay) {
tw.w.AttachRelay(rl)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 95fa6e06..ae59d611 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,7 +13,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -74,7 +74,7 @@ type Process struct {
mu sync.RWMutex
// communication bus with underlying process.
- relay goridge.Relay
+ relay relay.Relay
// rd in a second part of pipe to read from stderr
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
@@ -131,13 +131,13 @@ func (w *Process) State() internal.State {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) AttachRelay(rl goridge.Relay) {
+func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) Relay() goridge.Relay {
+func (w *Process) Relay() relay.Relay {
return w.relay
}