summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xMakefile33
-rwxr-xr-xpkg/payload/payload.go2
-rwxr-xr-xpkg/pool/static_pool.go16
-rw-r--r--plugins/kv/memcached/plugin.go40
4 files changed, 67 insertions, 24 deletions
diff --git a/Makefile b/Makefile
index fead8744..9182f840 100755
--- a/Makefile
+++ b/Makefile
@@ -93,9 +93,6 @@ test_1.14: ## Run application tests
go1.14.14 test -v -race -tags=debug ./pkg/pool
go1.14.14 test -v -race -tags=debug ./pkg/worker
go1.14.14 test -v -race -tags=debug ./pkg/worker_watcher
- go1.14.14 test -v -race -tags=debug ./tests/plugins/temporal
- go1.14.14 test -v -race -tags=debug ./plugins/temporal/protocol
- go1.14.14 test -v -race -tags=debug ./plugins/temporal/workflow
go1.14.14 test -v -race -tags=debug ./tests/plugins/http
go1.14.14 test -v -race -tags=debug ./plugins/http/config
go1.14.14 test -v -race -tags=debug ./tests/plugins/informer
@@ -119,4 +116,34 @@ test_1.14: ## Run application tests
go1.14.14 test -v -race -tags=debug ./tests/plugins/kv/memcached
docker-compose -f tests/docker-compose.yaml down
+test_1.16: ## Run application tests
+ docker-compose -f tests/docker-compose.yaml up -d
+ go1.16rc1 test -v -race -tags=debug ./pkg/transport/pipe
+ go1.16rc1 test -v -race -tags=debug ./pkg/transport/socket
+ go1.16rc1 test -v -race -tags=debug ./pkg/pool
+ go1.16rc1 test -v -race -tags=debug ./pkg/worker
+ go1.16rc1 test -v -race -tags=debug ./pkg/worker_watcher
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/http
+ go1.16rc1 test -v -race -tags=debug ./plugins/http/config
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/informer
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/reload
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/server
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/checker
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/config
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/gzip
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/headers
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/logger
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/metrics
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/redis
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/resetter
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/rpc
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/static
+ go1.16rc1 test -v -race -tags=debug ./plugins/kv/boltdb
+ go1.16rc1 test -v -race -tags=debug ./plugins/kv/memory
+ go1.16rc1 test -v -race -tags=debug ./plugins/kv/memcached
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/boltdb
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memory
+ go1.16rc1 test -v -race -tags=debug ./tests/plugins/kv/memcached
+ docker-compose -f tests/docker-compose.yaml down
+
test_pipeline: test_1.14 test
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
index bebe8df1..bf3972aa 100755
--- a/pkg/payload/payload.go
+++ b/pkg/payload/payload.go
@@ -20,4 +20,4 @@ func (p *Payload) String() string {
// unsafe, but lightning fast []byte to string conversion
func toString(data []byte) string {
return *(*string)(unsafe.Pointer(&data))
-} \ No newline at end of file
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index dd52f313..c667dc94 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -75,15 +75,18 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ // set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ // allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
- // put stack in the pool
+ // add workers to the watcher
err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
@@ -239,11 +242,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.SyncWorker) (payload.Payload, error) {
const op = errors.Op("error encoder")
// just push event if on any stage was timeout error
- if errors.Is(errors.ExecTTL, err) {
+
+ switch {
+ case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
- }
- // soft job errors are allowed
- if errors.Is(errors.SoftJob, err) {
+
+ case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.Allocate()
if err != nil {
@@ -258,8 +262,6 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
} else {
sp.ww.Push(w)
}
-
- return payload.Payload{}, errors.E(op, err)
}
w.State().Set(worker.StateInvalid)
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
index 181b8a49..b8392f9e 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/memcached/plugin.go
@@ -84,9 +84,13 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
exist, err := s.client.Get(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
}
if exist != nil {
m[keys[i]] = true
@@ -105,9 +109,12 @@ func (s *Plugin) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
data, err := s.client.Get(key)
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ return nil, nil
+ }
+ return nil, errors.E(op, err)
}
if data != nil {
// return the value by the key
@@ -137,9 +144,12 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
// Here also MultiGet
data, err := s.client.Get(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
}
if data != nil {
m[keys[i]] = data.Value
@@ -205,7 +215,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
// verify provided TTL
t, err := time.Parse(time.RFC3339, items[i].TTL)
if err != nil {
- return err
+ return errors.E(op, err)
}
// Touch updates the expiry for the given key. The seconds parameter is either
@@ -215,7 +225,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
// The key must be at most 250 bytes in length.
err = s.client.Touch(items[i].Key, int32(t.Unix()))
if err != nil {
- return err
+ return errors.E(op, err)
}
}
return nil
@@ -244,8 +254,12 @@ func (s *Plugin) Delete(keys ...string) error {
for i := range keys {
err := s.client.Delete(keys[i])
// ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return errors.E(op, err)
}
}
return nil