From 50944620fabb8cfa1d2bbbc50f64db457c4837ad Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 14 Dec 2021 12:11:19 +0300 Subject: protect pool operations and sync with workers checks Signed-off-by: Valery Piashchynski --- worker/sync_worker.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) (limited to 'worker') diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 12937eac..81d8c5bf 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) - go func() { - if len(p.Body) == 0 && len(p.Context) == 0 { - c <- wexec{ - err: errors.E(op, errors.Str("payload can not be empty")), - } - return - } - - if tw.process.State().Value() != StateReady { - c <- wexec{ - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), - } - return - } + // worker was killed before it started to work (supervisor) + if tw.process.State().Value() != StateReady { + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + go func() { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose -- cgit v1.2.3