diff options
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-x | sync_worker.go | 65 |
1 files changed, 44 insertions, 21 deletions
diff --git a/sync_worker.go b/sync_worker.go index c4d9aa9d..94a804a7 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -1,6 +1,7 @@ package roadrunner import ( + "bytes" "context" "time" @@ -8,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/util" "go.uber.org/multierr" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" ) var EmptyPayload = Payload{} @@ -134,38 +135,60 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, func (tw *syncWorker) execPayload(p Payload) (Payload, error) { const op = errors.Op("exec payload") - // two things; todo: merge - // NO CONTROL HERE - if err := sendControl(tw.w.Relay(), p.Context); err != nil { - return EmptyPayload, errors.E(op, err, "header error") - } - if err := tw.w.Relay().Send(p.Body, 0); err != nil { - return EmptyPayload, errors.E(op, err, "sender error") + frame := goridge.NewFrame() + frame.WriteVersion(goridge.VERSION_1) + // can be 0 here + + buf := new(bytes.Buffer) + buf.Write(p.Context) + buf.Write(p.Body) + + // Context offset + frame.WriteOptions(uint32(len(p.Context))) + frame.WritePayloadLen(uint32(buf.Len())) + frame.WritePayload(buf.Bytes()) + + frame.WriteCRC() + + // empty and free the buffer + buf.Truncate(0) + + err := tw.Relay().Send(frame) + if err != nil { + return EmptyPayload, err } - var pr goridge.Prefix - rsp := Payload{} + frameR := goridge.NewFrame() - var err error - if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.E(op, err, "WorkerProcess error") + err = tw.w.Relay().Receive(frameR) + if err != nil { + return EmptyPayload, errors.E(op, err) + } + if frameR == nil { + return EmptyPayload, errors.E(op, errors.Str("nil frame received")) } - if !pr.HasFlag(goridge.PayloadControl) { - return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response")) + if !frameR.VerifyCRC() { + return EmptyPayload, errors.E(op, errors.Str("failed to verify CRC")) } - if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context))) + flags := frameR.ReadFlags() + + if flags&byte(goridge.ERROR) != byte(0) { + return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } - // add streaming support :) - if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.E(op, err, "WorkerProcess error") + options := frameR.ReadOptions() + if len(options) != 1 { + return EmptyPayload, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - return rsp, nil + payload := Payload{} + payload.Context = frameR.Payload()[:options[0]] + payload.Body = frameR.Payload()[options[0]:] + + return payload, nil } func (tw *syncWorker) String() string { |