diff options
Diffstat (limited to 'sync_worker.go')
-rwxr-xr-x | sync_worker.go | 116 |
1 files changed, 26 insertions, 90 deletions
diff --git a/sync_worker.go b/sync_worker.go index de9491d6..d7c15e88 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/spiral/roadrunner/v2/util" + "github.com/pkg/errors" "github.com/spiral/goridge/v2" ) @@ -14,90 +16,24 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) - - // ExecWithContext allow to set ExecTTL - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } -type taskWorker struct { +type syncWorker struct { w WorkerBase } func NewSyncWorker(w WorkerBase) (SyncWorker, error) { - return &taskWorker{ + return &syncWorker{ w: w, }, nil } -type twexec struct { - payload Payload - err error -} - -func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - c := make(chan twexec) - go func() { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("payload can not be empty"), - } - return - } - - if tw.w.State().Value() != StateReady { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()), - } - return - } - - // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) - - rsp, err := tw.execPayload(rqs) - if err != nil { - if _, ok := err.(TaskError); !ok { - tw.w.State().Set(StateErrored) - tw.w.State().RegisterExec() - } - c <- twexec{ - payload: EmptyPayload, - err: err, - } - return - } - - tw.w.State().Set(StateReady) - tw.w.State().RegisterExec() - c <- twexec{ - payload: rsp, - err: nil, - } - return - }() - - for { - select { - case <-ctx.Done(): - return EmptyPayload, ctx.Err() - case res := <-c: - if res.err != nil { - return EmptyPayload, res.err - } - - return res.payload, nil - } - } -} - -// -func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { +// Exec payload without TTL timeout. +func (tw *syncWorker) Exec(p Payload) (Payload, error) { + if len(p.Body) == 0 && len(p.Context) == 0 { return EmptyPayload, fmt.Errorf("payload can not be empty") } @@ -109,9 +45,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.w.State().Set(StateWorking) - rsp, err := tw.execPayload(rqs) + rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(TaskError); !ok { + if _, ok := err.(JobError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -124,7 +60,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { +func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { return EmptyPayload, errors.Wrap(err, "header error") @@ -147,7 +83,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, TaskError(rsp.Context) + return EmptyPayload, JobError(rsp.Context) } // add streaming support :) @@ -158,46 +94,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) String() string { +func (tw *syncWorker) String() string { return tw.w.String() } -func (tw *taskWorker) Created() time.Time { - return tw.w.Created() +func (tw *syncWorker) Pid() int64 { + return tw.w.Pid() } -func (tw *taskWorker) Events() <-chan WorkerEvent { - return tw.w.Events() +func (tw *syncWorker) Created() time.Time { + return tw.w.Created() } -func (tw *taskWorker) Pid() int64 { - return tw.w.Pid() +func (tw *syncWorker) AddListener(listener util.EventListener) { + tw.w.AddListener(listener) } -func (tw *taskWorker) State() State { +func (tw *syncWorker) State() State { return tw.w.State() } -func (tw *taskWorker) Start() error { +func (tw *syncWorker) Start() error { return tw.w.Start() } -func (tw *taskWorker) Wait(ctx context.Context) error { +func (tw *syncWorker) Wait(ctx context.Context) error { return tw.w.Wait(ctx) } -func (tw *taskWorker) Stop(ctx context.Context) error { +func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *taskWorker) Kill(ctx context.Context) error { +func (tw *syncWorker) Kill(ctx context.Context) error { return tw.w.Kill(ctx) } -func (tw *taskWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() goridge.Relay { return tw.w.Relay() } -func (tw *taskWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl goridge.Relay) { tw.w.AttachRelay(rl) } |