diff options
author | Valery Piashchynski <[email protected]> | 2021-12-14 12:11:19 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-12-14 12:11:19 +0300 |
commit | 50944620fabb8cfa1d2bbbc50f64db457c4837ad (patch) | |
tree | a4ad76ae4fb8a953f7f2e465a99728775eb9179f /worker | |
parent | c588b446a45214577c76b185f4363853aa6ccb7c (diff) |
protect pool operations and sync with workers checks
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-x | worker/sync_worker.go | 28 |
1 files changed, 11 insertions, 17 deletions
diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 12937eac..81d8c5bf 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) - go func() { - if len(p.Body) == 0 && len(p.Context) == 0 { - c <- wexec{ - err: errors.E(op, errors.Str("payload can not be empty")), - } - return - } - - if tw.process.State().Value() != StateReady { - c <- wexec{ - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), - } - return - } + // worker was killed before it started to work (supervisor) + if tw.process.State().Value() != StateReady { + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + go func() { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose |