summaryrefslogtreecommitdiff
path: root/supervisor_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-09 15:11:10 +0300
committerGitHub <[email protected]>2020-11-09 15:11:10 +0300
commit0874bcb2f6b284a940ba4f3507eb8c4619c27868 (patch)
treec99d15624cd080cad22b7c8fb7d4714b2dc124fb /supervisor_pool.go
parent9fbe7726dd55cfedda724b7644e1b6bf7c1a6cb4 (diff)
parentf218dcbd7e55d9ad1df8336e2331cdaa62d9ded3 (diff)
Merge pull request #390 from spiral/feature/switch_to_spiral_errorsv2.0.0-alpha17
Feature/switch to spiral errors
Diffstat (limited to 'supervisor_pool.go')
-rwxr-xr-xsupervisor_pool.go16
1 files changed, 11 insertions, 5 deletions
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 92d03e77..e23abdd1 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -54,7 +54,7 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
res, err := sp.pool.ExecWithContext(ctx, rqs)
if err != nil {
c <- ttlExec{
- err: err,
+ err: errors.E(op, err),
p: EmptyPayload,
}
}
@@ -80,7 +80,12 @@ func (sp *supervisedPool) ExecWithContext(ctx context.Context, rqs Payload) (Pay
}
func (sp *supervisedPool) Exec(p Payload) (Payload, error) {
- return sp.pool.Exec(p)
+ const op = errors.Op("supervised exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return EmptyPayload, errors.E(op, err)
+ }
+ return rsp, nil
}
func (sp *supervisedPool) AddListener(listener util.EventListener) {
@@ -130,6 +135,7 @@ func (sp *supervisedPool) Stop() {
func (sp *supervisedPool) control() {
now := time.Now()
ctx := context.TODO()
+ const op = errors.Op("supervised pool control tick")
// THIS IS A COPY OF WORKERS
workers := sp.pool.Workers()
@@ -148,7 +154,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
@@ -160,7 +166,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
@@ -193,7 +199,7 @@ func (sp *supervisedPool) control() {
if sp.cfg.IdleTTL-uint64(res) <= 0 {
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
- sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
} else {
sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})