diff options
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-x | pkg/pool/static_pool.go | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 037294ea..5990f929 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) type Options func(p *StaticPool) @@ -135,7 +135,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { } // Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -144,7 +144,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { defer cancel() w, err := sp.takeWorker(ctxGetFree, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).Exec(p) @@ -168,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } // Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") if sp.cfg.Debug { return sp.execDebugWithTTL(ctx, p) @@ -178,7 +178,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo defer cancel() w, err := sp.takeWorker(ctxAlloc, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) @@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { @@ -273,10 +273,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() if errS != nil { - return payload.Payload{}, errors.E(op, err, errS) + return nil, errors.E(op, err, errS) } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } } @@ -300,10 +300,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio } // execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the workers exec method (without ttl) @@ -316,10 +316,10 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the worker with TTL |