summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rwxr-xr-xpool/static_pool.go28
1 files changed, 12 insertions, 16 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 25097395..f500f998 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -52,9 +52,8 @@ type StaticPool struct {
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
- const op = errors.Op("static_pool_initialize")
if factory == nil {
- return nil, errors.E(op, errors.Str("no factory initialized"))
+ return nil, errors.Str("no factory initialized")
}
cfg.InitDefaults()
@@ -83,13 +82,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
p.errEncoder = defaultErrEncoder(p)
@@ -245,16 +244,15 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)})
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -262,7 +260,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(worker.StateInvalid)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
return nil, err
@@ -275,7 +273,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -287,10 +285,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
- return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- return nil, errors.E(op, err)
+ return nil, err
}
}
}
@@ -317,7 +315,6 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// execDebug used when debug mode was not set and exec_ttl is 0
func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
- const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
return nil, err
@@ -326,7 +323,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
if err != nil {
- return nil, errors.E(op, err)
+ return nil, err
}
// destroy the worker
@@ -334,7 +331,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
- return nil, errors.E(op, err)
+ return nil, err
}
return r, nil
@@ -358,14 +355,13 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
- return nil, errors.E(op, errors.WorkerAllocate, err)
+ return nil, errors.E(errors.WorkerAllocate, err)
}
workers = append(workers, w)