diff options
author | Valery Piashchynski <[email protected]> | 2021-02-06 01:39:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-06 01:39:15 +0300 |
commit | 7b63af9b205be85ed2a50408f762344e1aba1a46 (patch) | |
tree | 6a0d9c05f8e49abde402afac35fdb26b51d18f45 /pkg | |
parent | 11b357c4457dfcbc1ef79478c200b794b5486b13 (diff) |
Correct memcached Plugin RPC methods (remove redundant checks, user
errors.E)
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/payload/payload.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 16 |
2 files changed, 10 insertions, 8 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index bebe8df1..bf3972aa 100755 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -20,4 +20,4 @@ func (p *Payload) String() string { // unsafe, but lightning fast []byte to string conversion func toString(data []byte) string { return *(*string)(unsafe.Pointer(&data)) -}
\ No newline at end of file +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index dd52f313..c667dc94 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -75,15 +75,18 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg options[i](p) } + // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + // set up workers watcher p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } - // put stack in the pool + // add workers to the watcher err = p.ww.Watch(workers) if err != nil { return nil, errors.E(op, err) @@ -239,11 +242,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.SyncWorker) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error - if errors.Is(errors.ExecTTL, err) { + + switch { + case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) - } - // soft job errors are allowed - if errors.Is(errors.SoftJob, err) { + + case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.Allocate() if err != nil { @@ -258,8 +262,6 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } else { sp.ww.Push(w) } - - return payload.Payload{}, errors.E(op, err) } w.State().Set(worker.StateInvalid) |