diff options
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-x | sync_worker.go | 86 |
1 files changed, 78 insertions, 8 deletions
diff --git a/sync_worker.go b/sync_worker.go index 2f3eb1e4..6dd8d8e8 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -5,9 +5,10 @@ import ( "fmt" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" + "go.uber.org/multierr" - "github.com/pkg/errors" "github.com/spiral/goridge/v2" ) @@ -18,7 +19,8 @@ type SyncWorker interface { WorkerBase // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(p Payload) (Payload, error) + Exec(rqs Payload) (Payload, error) + ExecWithContext(ctx context.Context, p Payload) (Payload, error) } type syncWorker struct { @@ -60,14 +62,82 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { return rsp, nil } +type wexec struct { + payload Payload + err error +} + +// Exec payload without TTL timeout. +func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) { + const op = errors.Op("exec_with_context") + c := make(chan wexec, 1) + go func() { + if len(p.Body) == 0 && len(p.Context) == 0 { + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, errors.Str("payload can not be empty")), + } + return + } + + if tw.w.State().Value() != StateReady { + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())), + } + return + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(StateWorking) + + rsp, err := tw.execPayload(p) + if err != nil { + if _, ok := err.(JobError); !ok { + tw.w.State().Set(StateErrored) + tw.w.State().RegisterExec() + } + c <- wexec{ + payload: EmptyPayload, + err: errors.E(op, err), + } + return + } + + tw.w.State().Set(StateReady) + tw.w.State().RegisterExec() + + c <- wexec{ + payload: rsp, + err: nil, + } + }() + + select { + case <-ctx.Done(): + err := multierr.Combine(tw.Kill()) + if err != nil { + return EmptyPayload, multierr.Append(err, ctx.Err()) + } + return EmptyPayload, ctx.Err() + case res := <-c: + if res.err != nil { + return EmptyPayload, res.err + } + return res.payload, nil + } +} + func (tw *syncWorker) execPayload(p Payload) (Payload, error) { + const op = errors.Op("exec_payload") // two things; todo: merge if err := sendControl(tw.w.Relay(), p.Context); err != nil { - return EmptyPayload, errors.Wrap(err, "header error") + return EmptyPayload, errors.E(op, err, "header error") } if err := tw.w.Relay().Send(p.Body, 0); err != nil { - return EmptyPayload, errors.Wrap(err, "sender error") + return EmptyPayload, errors.E(op, err, "sender error") } var pr goridge.Prefix @@ -75,7 +145,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { var err error if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + return EmptyPayload, errors.E(op, err, "WorkerProcess error") } if !pr.HasFlag(goridge.PayloadControl) { @@ -88,7 +158,7 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { // add streaming support :) if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil { - return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + return EmptyPayload, errors.E(op, err, "WorkerProcess error") } return rsp, nil @@ -126,8 +196,8 @@ func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *syncWorker) Kill(ctx context.Context) error { - return tw.w.Kill(ctx) +func (tw *syncWorker) Kill() error { + return tw.w.Kill() } func (tw *syncWorker) Relay() goridge.Relay { |