summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-06 01:39:15 +0300
committerValery Piashchynski <[email protected]>2021-02-06 01:39:15 +0300
commit7b63af9b205be85ed2a50408f762344e1aba1a46 (patch)
tree6a0d9c05f8e49abde402afac35fdb26b51d18f45 /pkg
parent11b357c4457dfcbc1ef79478c200b794b5486b13 (diff)
Correct memcached Plugin RPC methods (remove redundant checks, user
errors.E)
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/payload/payload.go2
-rwxr-xr-xpkg/pool/static_pool.go16
2 files changed, 10 insertions, 8 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
index bebe8df1..bf3972aa 100755
--- a/pkg/payload/payload.go
+++ b/pkg/payload/payload.go
@@ -20,4 +20,4 @@ func (p *Payload) String() string {
// unsafe, but lightning fast []byte to string conversion
func toString(data []byte) string {
return *(*string)(unsafe.Pointer(&data))
-} \ No newline at end of file
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index dd52f313..c667dc94 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -75,15 +75,18 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ // set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ // allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
- // put stack in the pool
+ // add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
@@ -239,11 +242,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
- if errors.Is(errors.ExecTTL, err) {
+
+ switch {
+ case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- }
- // soft job errors are allowed
- if errors.Is(errors.SoftJob, err) {
+
+ case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.Allocate()
if err != nil {
@@ -258,8 +262,6 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
} else {
sp.ww.Push(w)
}
-
- return payload.Payload{}, errors.E(op, err)
}
w.State().Set(worker.StateInvalid)