diff options
-rwxr-xr-x | Makefile | 33 | ||||
-rwxr-xr-x | pkg/payload/payload.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 16 | ||||
-rw-r--r-- | plugins/kv/memcached/plugin.go | 40 |
4 files changed, 67 insertions, 24 deletions
@@ -93,9 +93,6 @@ test_1.14: ## Run application tests go1.14.14 test -v -race -tags=debug ./pkg/pool go1.14.14 test -v -race -tags=debug ./pkg/worker go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher - go1.14.14 test -v -race -tags=debug ./tests/plugins/temporal - go1.14.14 test -v -race -tags=debug ./plugins/temporal/protocol - go1.14.14 test -v -race -tags=debug ./plugins/temporal/workflow go1.14.14 test -v -race -tags=debug ./tests/plugins/http go1.14.14 test -v -race -tags=debug ./plugins/http/config go1.14.14 test -v -race -tags=debug ./tests/plugins/informer @@ -119,4 +116,34 @@ test_1.14: ## Run application tests go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached docker-compose -f tests/docker-compose.yaml down +test_1.16: ## Run application tests + docker-compose -f tests/docker-compose.yaml up -d + go1.16rc1 test -v -race -tags=debug ./pkg/transport/pipe + go1.16rc1 test -v -race -tags=debug ./pkg/transport/socket + go1.16rc1 test -v -race -tags=debug ./pkg/pool + go1.16rc1 test -v -race -tags=debug ./pkg/worker + go1.16rc1 test -v -race -tags=debug ./pkg/worker_watcher + go1.16rc1 test -v -race -tags=debug ./tests/plugins/http + go1.16rc1 test -v -race -tags=debug ./plugins/http/config + go1.16rc1 test -v -race -tags=debug ./tests/plugins/informer + go1.16rc1 test -v -race -tags=debug ./tests/plugins/reload + go1.16rc1 test -v -race -tags=debug ./tests/plugins/server + go1.16rc1 test -v -race -tags=debug ./tests/plugins/checker + go1.16rc1 test -v -race -tags=debug ./tests/plugins/config + go1.16rc1 test -v -race -tags=debug ./tests/plugins/gzip + go1.16rc1 test -v -race -tags=debug ./tests/plugins/headers + go1.16rc1 test -v -race -tags=debug ./tests/plugins/logger + go1.16rc1 test -v -race -tags=debug ./tests/plugins/metrics + go1.16rc1 test -v -race -tags=debug ./tests/plugins/redis + go1.16rc1 test -v -race -tags=debug ./tests/plugins/resetter + go1.16rc1 test -v -race -tags=debug ./tests/plugins/rpc + go1.16rc1 test -v -race -tags=debug ./tests/plugins/static + go1.16rc1 test -v -race -tags=debug ./plugins/kv/boltdb + go1.16rc1 test -v -race -tags=debug ./plugins/kv/memory + go1.16rc1 test -v -race -tags=debug ./plugins/kv/memcached + go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/boltdb + go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memory + go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memcached + docker-compose -f tests/docker-compose.yaml down + test_pipeline: test_1.14 test diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index bebe8df1..bf3972aa 100755 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -20,4 +20,4 @@ func (p *Payload) String() string { // unsafe, but lightning fast []byte to string conversion func toString(data []byte) string { return *(*string)(unsafe.Pointer(&data)) -}
\ No newline at end of file +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index dd52f313..c667dc94 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -75,15 +75,18 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg options[i](p) } + // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + // set up workers watcher p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } - // put stack in the pool + // add workers to the watcher err = p.ww.Watch(workers) if err != nil { return nil, errors.E(op, err) @@ -239,11 +242,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.SyncWorker) (payload.Payload, error) { const op = errors.Op("error encoder") // just push event if on any stage was timeout error - if errors.Is(errors.ExecTTL, err) { + + switch { + case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) - } - // soft job errors are allowed - if errors.Is(errors.SoftJob, err) { + + case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.Allocate() if err != nil { @@ -258,8 +262,6 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } else { sp.ww.Push(w) } - - return payload.Payload{}, errors.E(op, err) } w.State().Set(worker.StateInvalid) diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go index 181b8a49..b8392f9e 100644 --- a/plugins/kv/memcached/plugin.go +++ b/plugins/kv/memcached/plugin.go @@ -84,9 +84,13 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } exist, err := s.client.Get(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) } if exist != nil { m[keys[i]] = true @@ -105,9 +109,12 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } data, err := s.client.Get(key) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) } if data != nil { // return the value by the key @@ -137,9 +144,12 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { // Here also MultiGet data, err := s.client.Get(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) } if data != nil { m[keys[i]] = data.Value @@ -205,7 +215,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { // verify provided TTL t, err := time.Parse(time.RFC3339, items[i].TTL) if err != nil { - return err + return errors.E(op, err) } // Touch updates the expiry for the given key. The seconds parameter is either @@ -215,7 +225,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { // The key must be at most 250 bytes in length. err = s.client.Touch(items[i].Key, int32(t.Unix())) if err != nil { - return err + return errors.E(op, err) } } return nil @@ -244,8 +254,12 @@ func (s *Plugin) Delete(keys ...string) error { for i := range keys { err := s.client.Delete(keys[i]) // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) } } return nil |