summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/config.go2
-rw-r--r--pkg/pool/interface.go12
-rwxr-xr-xpkg/pool/static_pool.go63
-rwxr-xr-xpkg/pool/static_pool_test.go124
-rwxr-xr-xpkg/pool/supervisor_pool.go14
-rw-r--r--pkg/pool/supervisor_test.go37
6 files changed, 148 insertions, 104 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index bbf7653e..4049122c 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -13,7 +13,7 @@ type Pool interface {
GetConfig() interface{}
// Exec executes task with payload
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
@@ -25,7 +25,7 @@ type Pool interface {
Destroy(ctx context.Context)
// ExecWithContext executes task with context which is used with timeout
- execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+ execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
}
// Watcher is an interface for the Sync workers lifecycle
@@ -33,11 +33,11 @@ type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
+ // Take takes the first free worker
+ Take(ctx context.Context) (worker.BaseProcess, error)
- // Push enqueues worker back
- Push(w worker.BaseProcess)
+ // Release releases the worker putting it back to the queue
+ Release(w worker.BaseProcess)
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 5a6247b5..051e7a8a 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
type Options func(p *StaticPool)
@@ -26,7 +26,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg Config
+ cfg *Config
// worker command creator
cmd Command
@@ -51,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
@@ -135,16 +135,16 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
}
// Exec executes provided payload on the worker
-func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxGetFree, op)
+ w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).Exec(p)
@@ -163,12 +163,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return rsp, nil
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
// Be careful, sync with pool.Exec method
-func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
if sp.cfg.Debug {
return sp.execDebugWithTTL(ctx, p)
@@ -176,9 +176,9 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxAlloc, op)
+ w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
@@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
- sp.ww.Push(w)
+ sp.ww.Release(w)
return
}
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// Get function consumes context with timeout
- w, err := sp.ww.Get(ctxGetFree)
+ w, err := sp.ww.Take(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
@@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.SoftJob, err):
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // TODO suspicious logic, redesign
err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
@@ -265,7 +266,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
} else {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
}
@@ -273,10 +274,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
if errS != nil {
- return payload.Payload{}, errors.E(op, err, errS)
+ return nil, errors.E(op, err, errS)
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
}
@@ -289,6 +290,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
+ // wrap sync worker
sw := worker.From(w)
sp.events.Push(events.PoolEvent{
@@ -300,26 +302,33 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
}
// execDebug used when debug mode was not set and exec_ttl is 0
-func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
- // redirect call to the workers exec method (without ttl)
+ // redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
- if stopErr := sw.Stop(); stopErr != nil {
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = sw.Stop()
+ if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ return nil, errors.E(op, err)
}
- return r, err
+ return r, nil
}
// execDebugWithTTL used when user set debug mode and exec_ttl
-func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the worker with TTL
@@ -333,7 +342,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("allocate workers")
+ const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6f875072..2ac2093d 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
@@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -204,7 +202,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Consume pool events
+ // Run pool events
ev := make(chan struct{}, 1)
listener := func(event interface{}) {
if pe, ok := event.(events.PoolEvent); ok {
@@ -214,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg2 = Config{
+ var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -264,7 +262,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -283,7 +281,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -320,7 +318,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -360,7 +358,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -400,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -422,7 +420,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 100)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -452,7 +450,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p.Workers()[i].State().Set(worker.StateErrored)
}
- _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -476,7 +474,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -506,7 +504,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: false,
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.NotNil(t, p)
go func() {
- _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
}()
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -539,7 +536,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -556,7 +553,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -567,6 +564,24 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
assert.Nil(t, p)
}
+/* PTR:
+Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op
+
+VAL:
+Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op
+*/
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
@@ -579,23 +594,33 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.Fatal(err)
}
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(pld); err != nil {
b.Fail()
}
}
}
// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -604,12 +629,23 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
assert.NoError(b, err)
defer p.Destroy(ctx)
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(pld); err != nil {
b.Fail()
log.Println(err)
}
@@ -626,7 +662,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -639,7 +675,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 4b990dbe..bdaeade1 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
panic("used to satisfy pool interface")
}
-func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
return res, nil
@@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
- // just to double check
+ // just to double-check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 1cd301ba..0702a71f 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
-var cfgSupervised = Config{
+var cfgSupervised = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -84,7 +84,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
assert.Error(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
+ assert.Empty(t, resp)
time.Sleep(time.Second * 1)
// should be new worker with new pid
@@ -125,7 +124,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
Supervisor: &SupervisorConfig{
WatchTick: 1 * time.Second,
@@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()
- resp, err = p.Exec(payload.Payload{
+ resp, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -177,7 +176,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
}
func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -225,7 +224,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
}
func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -267,7 +266,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -309,7 +308,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
}
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})