diff options
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-x | pool/static_pool.go | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 25097395..6c2df4e3 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -52,9 +52,8 @@ type StaticPool struct { // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { - const op = errors.Op("static_pool_initialize") if factory == nil { - return nil, errors.E(op, errors.Str("no factory initialized")) + return nil, errors.Str("no factory initialized") } cfg.InitDefaults() @@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { - return nil, errors.E(op, err) + return nil, err } // add workers to the watcher err = p.ww.Watch(workers) if err != nil { - return nil, errors.E(op, err) + return nil, err } p.errEncoder = defaultErrEncoder(p) @@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)}) + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { w.State().Set(worker.StateInvalid) errS := w.Stop() if errS != nil { - return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) } return nil, err @@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { - return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) } - return nil, errors.E(op, err) + return nil, err } } } @@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // execDebug used when debug mode was not set and exec_ttl is 0 func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec_debug") sw, err := sp.allocator() if err != nil { return nil, err @@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { // redirect call to the workers' exec method (without ttl) r, err := sw.Exec(p) if err != nil { - return nil, errors.E(op, err) + return nil, err } // destroy the worker @@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { err = sw.Kill() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) - return nil, errors.E(op, err) + return nil, err } return r, nil @@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("static_pool_allocate_workers") workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { - return nil, errors.E(op, errors.WorkerAllocate, err) + return nil, err } workers = append(workers, w) |