diff options
author | Valery Piashchynski <[email protected]> | 2021-12-14 12:11:19 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-12-14 14:15:45 +0300 |
commit | 253d7f7abf7a53b5249c08949372da5c9b687b04 (patch) | |
tree | a4ad76ae4fb8a953f7f2e465a99728775eb9179f | |
parent | 8c3420cb0d05036bc69f8dcc14ef832860a3c3d4 (diff) |
protect pool operations and sync with workers checks
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-x | pool/supervisor_pool.go | 4 | ||||
-rwxr-xr-x | worker/sync_worker.go | 28 |
2 files changed, 15 insertions, 17 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index a7a1ae52..b01520e2 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -54,12 +54,16 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { + sp.mu.RLock() + defer sp.mu.RUnlock() return sp.pool.Exec(rqs) } ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() + sp.mu.RLock() + defer sp.mu.RUnlock() res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { return nil, errors.E(op, err) diff --git a/worker/sync_worker.go b/worker/sync_worker.go index 12937eac..81d8c5bf 100755 --- a/worker/sync_worker.go +++ b/worker/sync_worker.go @@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) ( const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) - go func() { - if len(p.Body) == 0 && len(p.Context) == 0 { - c <- wexec{ - err: errors.E(op, errors.Str("payload can not be empty")), - } - return - } - - if tw.process.State().Value() != StateReady { - c <- wexec{ - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), - } - return - } + // worker was killed before it started to work (supervisor) + if tw.process.State().Value() != StateReady { + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + } + // set last used time + tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.process.State().Set(StateWorking) - // set last used time - tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(StateWorking) + if len(p.Body) == 0 && len(p.Context) == 0 { + return nil, errors.E(op, errors.Str("payload can not be empty")) + } + go func() { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose |