diff options
Diffstat (limited to 'sync_worker.go')
-rw-r--r-- | sync_worker.go | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/sync_worker.go b/sync_worker.go new file mode 100644 index 00000000..45629f3e --- /dev/null +++ b/sync_worker.go @@ -0,0 +1,171 @@ +package roadrunner + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/spiral/goridge/v2" +) + +var EmptyPayload = Payload{} + +type SyncWorker interface { + // WorkerBase provides basic functionality for the SyncWorker + WorkerBase + // Exec used to execute payload on the SyncWorker + Exec(ctx context.Context, rqs Payload) (Payload, error) +} + +type taskWorker struct { + w WorkerBase +} + +func NewSyncWorker(w WorkerBase) (SyncWorker, error) { + return &taskWorker{ + w: w, + }, nil +} + +type twexec struct { + payload Payload + err error +} + +func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) { + c := make(chan twexec) + go func() { + if len(rqs.Body) == 0 && len(rqs.Context) == 0 { + c <- twexec{ + payload: EmptyPayload, + err: fmt.Errorf("payload can not be empty"), + } + return + } + + if tw.w.State().Value() != StateReady { + c <- twexec{ + payload: EmptyPayload, + err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()), + } + return + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(StateWorking) + + rsp, err := tw.execPayload(rqs) + if err != nil { + if _, ok := err.(TaskError); !ok { + tw.w.State().Set(StateErrored) + tw.w.State().RegisterExec() + } + c <- twexec{ + payload: EmptyPayload, + err: err, + } + return + } + + tw.w.State().Set(StateReady) + tw.w.State().RegisterExec() + c <- twexec{ + payload: rsp, + err: nil, + } + return + }() + + for { + select { + case <-ctx.Done(): + return EmptyPayload, ctx.Err() + case res := <-c: + if res.err != nil { + return EmptyPayload, res.err + } + + return res.payload, nil + } + } +} + +func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { + // two things; todo: merge + if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { + return EmptyPayload, errors.Wrap(err, "header error") + } + + if err := tw.w.Relay().Send(rqs.Body, 0); err != nil { + return EmptyPayload, errors.Wrap(err, "sender error") + } + + var pr goridge.Prefix + rsp := Payload{} + + var err error + if rsp.Context, pr, err = tw.w.Relay().Receive(); err != nil { + return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + } + + if !pr.HasFlag(goridge.PayloadControl) { + return EmptyPayload, fmt.Errorf("malformed WorkerProcess response") + } + + if pr.HasFlag(goridge.PayloadError) { + return EmptyPayload, TaskError(rsp.Context) + } + + // add streaming support :) + if rsp.Body, pr, err = tw.w.Relay().Receive(); err != nil { + return EmptyPayload, errors.Wrap(err, "WorkerProcess error") + } + + return rsp, nil +} + +func (tw *taskWorker) String() string { + return tw.w.String() +} + +func (tw *taskWorker) Created() time.Time { + return tw.w.Created() +} + +func (tw *taskWorker) Events() <-chan WorkerEvent { + return tw.w.Events() +} + +func (tw *taskWorker) Pid() int64 { + return tw.w.Pid() +} + +func (tw *taskWorker) State() State { + return tw.w.State() +} + +func (tw *taskWorker) Start() error { + return tw.w.Start() +} + +func (tw *taskWorker) Wait(ctx context.Context) error { + return tw.w.Wait(ctx) +} + +func (tw *taskWorker) Stop(ctx context.Context) error { + return tw.w.Stop(ctx) +} + +func (tw *taskWorker) Kill(ctx context.Context) error { + return tw.w.Kill(ctx) +} + +func (tw *taskWorker) Relay() goridge.Relay { + return tw.w.Relay() +} + +func (tw *taskWorker) AttachRelay(rl goridge.Relay) { + tw.w.AttachRelay(rl) +} |