summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-06 01:39:15 +0300
committerValery Piashchynski <[email protected]>2021-02-06 01:39:15 +0300
commit7b63af9b205be85ed2a50408f762344e1aba1a46 (patch)
tree6a0d9c05f8e49abde402afac35fdb26b51d18f45 /pkg/pool/static_pool.go
parent11b357c4457dfcbc1ef79478c200b794b5486b13 (diff)
Correct memcached Plugin RPC methods (remove redundant checks, user
errors.E)
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go16
1 files changed, 9 insertions, 7 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index dd52f313..c667dc94 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -75,15 +75,18 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ // set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ // allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
- // put stack in the pool
+ // add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
@@ -239,11 +242,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
- if errors.Is(errors.ExecTTL, err) {
+
+ switch {
+ case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- }
- // soft job errors are allowed
- if errors.Is(errors.SoftJob, err) {
+
+ case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.Allocate()
if err != nil {
@@ -258,8 +262,6 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
} else {
sp.ww.Push(w)
}
-
- return payload.Payload{}, errors.E(op, err)
}
w.State().Set(worker.StateInvalid)