summaryrefslogtreecommitdiff
path: root/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-18 16:19:28 +0300
committerGitHub <[email protected]>2021-10-18 16:19:28 +0300
commitc1c4136e86bf71837c7995ff75d859b4b68f0616 (patch)
tree086e2e8338e47dd0efae3704812db65e67104e38 /pool/static_pool.go
parentd5474764f095fc2d829654272d5b5bf3662d0241 (diff)
parent6a0394a011e8b799d036a80e72d172162122c9f5 (diff)
[#830]: refactoring(logger): update error stacktracev2.5.0-rc.2
[#830]: refactoring(logger): update error stacktrace
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-xpool/static_pool.go28
1 files changed, 12 insertions, 16 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 25097395..f500f998 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -52,9 +52,8 @@ type StaticPool struct {
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
- const op = errors.Op("static_pool_initialize")
if factory == nil {
- return nil, errors.E(op, errors.Str("no factory initialized"))
+ return nil, errors.Str("no factory initialized")
}
cfg.InitDefaults()
@@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
p.errEncoder = defaultErrEncoder(p)
@@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(worker.StateInvalid)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
return nil, err
@@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- return nil, errors.E(op, err)
+ return nil, err
}
}
}
@@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// execDebug used when debug mode was not set and exec_ttl is 0
func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
return nil, err
@@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// destroy the worker
@@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- return nil, errors.E(op, err)
+ return nil, err
}
return r, nil
@@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
- return nil, errors.E(op, errors.WorkerAllocate, err)
+ return nil, errors.E(errors.WorkerAllocate, err)
}
workers = append(workers, w)