diff options
author | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
commit | 77670fb7af0c892c9b3a589fd424534fad288e7a (patch) | |
tree | 3adcaa85db664a355abe2b28f1d7e4a3fc45689f /sync_worker.go | |
parent | 16fbf3104c3c34bd9355593052b686acd26a8efe (diff) |
Update according activity worker
Diffstat (limited to 'sync_worker.go')
-rw-r--r-- | sync_worker.go | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/sync_worker.go b/sync_worker.go index 45629f3e..a6e1ed01 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -14,8 +14,11 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase - // Exec used to execute payload on the SyncWorker - Exec(ctx context.Context, rqs Payload) (Payload, error) + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs Payload) (Payload, error) + + // ExecWithContext allow to set ExecTTL + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } type taskWorker struct { @@ -33,7 +36,7 @@ type twexec struct { err error } -func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) { +func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { c := make(chan twexec) go func() { if len(rqs.Body) == 0 && len(rqs.Context) == 0 { @@ -92,6 +95,36 @@ func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) { } } +// +func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { + if len(rqs.Body) == 0 && len(rqs.Context) == 0 { + return EmptyPayload, fmt.Errorf("payload can not be empty") + } + + if tw.w.State().Value() != StateReady { + return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()) + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(StateWorking) + + rsp, err := tw.execPayload(rqs) + if err != nil { + if _, ok := err.(TaskError); !ok { + tw.w.State().Set(StateErrored) + tw.w.State().RegisterExec() + } + return EmptyPayload, err + } + + tw.w.State().Set(StateReady) + tw.w.State().RegisterExec() + + return rsp, nil + +} + func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { |