summaryrefslogtreecommitdiff
path: root/sync_worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-xsync_worker.go65
1 files changed, 44 insertions, 21 deletions
diff --git a/sync_worker.go b/sync_worker.go
index c4d9aa9d..94a804a7 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "bytes"
"context"
"time"
@@ -8,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/util"
"go.uber.org/multierr"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
)
var EmptyPayload = Payload{}
@@ -134,38 +135,60 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
const op = errors.Op("exec payload")
- // two things; todo: merge
- // NO CONTROL HERE
- if err := sendControl(tw.w.Relay(), p.Context); err != nil {
- return EmptyPayload, errors.E(op, err, "header error")
- }
- if err := tw.w.Relay().Send(p.Body, 0); err != nil {
- return EmptyPayload, errors.E(op, err, "sender error")
+ frame := goridge.NewFrame()
+ frame.WriteVersion(goridge.VERSION_1)
+ // can be 0 here
+
+ buf := new(bytes.Buffer)
+ buf.Write(p.Context)
+ buf.Write(p.Body)
+
+ // Context offset
+ frame.WriteOptions(uint32(len(p.Context)))
+ frame.WritePayloadLen(uint32(buf.Len()))
+ frame.WritePayload(buf.Bytes())
+
+ frame.WriteCRC()
+
+ // empty and free the buffer
+ buf.Truncate(0)
+
+ err := tw.Relay().Send(frame)
+ if err != nil {
+ return EmptyPayload, err
}
- var pr goridge.Prefix
- rsp := Payload{}
+ frameR := goridge.NewFrame()
- var err error
- if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.E(op, err, "WorkerProcess error")
+ err = tw.w.Relay().Receive(frameR)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ if frameR == nil {
+ return EmptyPayload, errors.E(op, errors.Str("nil frame received"))
}
- if !pr.HasFlag(goridge.PayloadControl) {
- return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response"))
+ if !frameR.VerifyCRC() {
+ return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC"))
}
- if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context)))
+ flags := frameR.ReadFlags()
+
+ if flags&byte(goridge.ERROR) != byte(0) {
+ return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
- // add streaming support :)
- if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil {
- return EmptyPayload, errors.E(op, err, "WorkerProcess error")
+ options := frameR.ReadOptions()
+ if len(options) != 1 {
+ return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- return rsp, nil
+ payload := Payload{}
+ payload.Context = frameR.Payload()[:options[0]]
+ payload.Body = frameR.Payload()[options[0]:]
+
+ return payload, nil
}
func (tw *syncWorker) String() string {