summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-14 12:11:19 +0300
committerValery Piashchynski <[email protected]>2021-12-14 14:15:45 +0300
commit253d7f7abf7a53b5249c08949372da5c9b687b04 (patch)
treea4ad76ae4fb8a953f7f2e465a99728775eb9179f
parent8c3420cb0d05036bc69f8dcc14ef832860a3c3d4 (diff)
protect pool operations and sync with workers checks
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xpool/supervisor_pool.go4
-rwxr-xr-xworker/sync_worker.go28
2 files changed, 15 insertions, 17 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index a7a1ae52..b01520e2 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -54,12 +54,16 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo
func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
return sp.pool.Exec(rqs)
}
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
return nil, errors.E(op, err)
diff --git a/worker/sync_worker.go b/worker/sync_worker.go
index 12937eac..81d8c5bf 100755
--- a/worker/sync_worker.go
+++ b/worker/sync_worker.go
@@ -83,25 +83,19 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
- go func() {
- if len(p.Body) == 0 && len(p.Context) == 0 {
- c <- wexec{
- err: errors.E(op, errors.Str("payload can not be empty")),
- }
- return
- }
-
- if tw.process.State().Value() != StateReady {
- c <- wexec{
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
- }
- return
- }
+ // worker was killed before it started to work (supervisor)
+ if tw.process.State().Value() != StateReady {
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ }
+ // set last used time
+ tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.process.State().Set(StateWorking)
- // set last used time
- tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(StateWorking)
+ if len(p.Body) == 0 && len(p.Context) == 0 {
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
+ }
+ go func() {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose