summaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-14 12:11:19 +0300
committerValery Piashchynski <[email protected]>2021-12-14 12:11:19 +0300
commit50944620fabb8cfa1d2bbbc50f64db457c4837ad (patch)
treea4ad76ae4fb8a953f7f2e465a99728775eb9179f /worker
parentc588b446a45214577c76b185f4363853aa6ccb7c (diff)
protect pool operations and sync with workers checks
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-xworker/sync_worker.go28
1 files changed, 11 insertions, 17 deletions
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
index 12937eac..81d8c5bf 100755
--- a/worker/sync_worker.go
+++ b/worker/sync_worker.go
@@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != StateReady {
- c <- wexec{
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
+ // worker was killed before it started to work (supervisor)
+ if tw.process.State().Value() != StateReady {
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+ go func() {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose