diff options
Diffstat (limited to 'pkg/pool')
-rw-r--r-- | pkg/pool/config.go | 2 | ||||
-rw-r--r-- | pkg/pool/interface.go | 12 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 63 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 124 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 14 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 37 |
6 files changed, 148 insertions, 104 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 2a3dabe4..3a058956 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -5,7 +5,7 @@ import ( "time" ) -// Configures the pool behavior. +// Config .. Pool config Configures the pool behavior. type Config struct { // Debug flag creates new fresh worker before every request. Debug bool diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index bbf7653e..4049122c 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -13,7 +13,7 @@ type Pool interface { GetConfig() interface{} // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -25,7 +25,7 @@ type Pool interface { Destroy(ctx context.Context) // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) } // Watcher is an interface for the Sync workers lifecycle @@ -33,11 +33,11 @@ type Watcher interface { // Watch used to add workers to the container Watch(workers []worker.BaseProcess) error - // Get provide first free worker - Get(ctx context.Context) (worker.BaseProcess, error) + // Take takes the first free worker + Take(ctx context.Context) (worker.BaseProcess, error) - // Push enqueues worker back - Push(w worker.BaseProcess) + // Release releases the worker putting it back to the queue + Release(w worker.BaseProcess) // Allocate - allocates new worker and put it into the WorkerWatcher Allocate() error diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 5a6247b5..051e7a8a 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) type Options func(p *StaticPool) @@ -26,7 +26,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - cfg Config + cfg *Config // worker command creator cmd Command @@ -51,7 +51,7 @@ type StaticPool struct { } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) @@ -135,16 +135,16 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { } // Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) } ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.takeWorker(ctxGetFree, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).Exec(p) @@ -163,12 +163,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { return rsp, nil } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } // Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") if sp.cfg.Debug { return sp.execDebugWithTTL(ctx, p) @@ -176,9 +176,9 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxAlloc, op) + w, err := sp.takeWorker(ctxAlloc, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) @@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo } // return worker back - sp.ww.Push(w) + sp.ww.Release(w) return rsp, nil } @@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) - sp.ww.Push(w) + sp.ww.Release(w) return } - sp.ww.Push(w) + sp.ww.Release(w) } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { // Get function consumes context with timeout - w, err := sp.ww.Get(ctxGetFree) + w, err := sp.ww.Take(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { @@ -253,6 +253,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.SoftJob, err): if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // TODO suspicious logic, redesign err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) @@ -265,7 +266,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } else { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Push(w) + sp.ww.Release(w) } } @@ -273,10 +274,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() if errS != nil { - return payload.Payload{}, errors.E(op, err, errS) + return nil, errors.E(op, err, errS) } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } } @@ -289,6 +290,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } + // wrap sync worker sw := worker.From(w) sp.events.Push(events.PoolEvent{ @@ -300,26 +302,33 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio } // execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec_debug") sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } - // redirect call to the workers exec method (without ttl) + // redirect call to the workers' exec method (without ttl) r, err := sw.Exec(p) - if stopErr := sw.Stop(); stopErr != nil { + if err != nil { + return nil, errors.E(op, err) + } + + err = sw.Stop() + if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + return nil, errors.E(op, err) } - return r, err + return r, nil } // execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the worker with TTL @@ -333,7 +342,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) ( // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("allocate workers") + const op = errors.Op("static_pool_allocate_workers") workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 6f875072..2ac2093d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.Exec") @@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -204,7 +202,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() - // Consume pool events + // Run pool events ev := make(chan struct{}, 1) listener := func(event interface{}) { if pe, ok := event.(events.PoolEvent); ok { @@ -214,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } - var cfg2 = Config{ + var cfg2 = &Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, @@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -264,7 +262,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, @@ -283,7 +281,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -320,7 +318,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -360,7 +358,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -400,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -422,7 +420,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, errP := p.Exec(payload.Payload{Body: []byte("100")}) + _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) if errP != nil { t.Errorf("error executing payload: error %v", err) } @@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 100) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -452,7 +450,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p.Workers()[i].State().Set(worker.StateErrored) } - _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) p.Destroy(ctx) } @@ -476,7 +474,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -506,7 +504,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ Debug: false, NumWorkers: 1, AllocateTimeout: time.Second, @@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -539,7 +536,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -556,7 +553,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -567,6 +564,24 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { assert.Nil(t, p) } +/* PTR: +Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op +Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op +Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op + +VAL: +Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op +Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op +Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op +Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op +Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op +Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op +Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op +*/ func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() p, err := Initialize( @@ -579,23 +594,33 @@ func Benchmark_Pool_Echo(b *testing.B) { b.Fatal(err) } + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(pld); err != nil { b.Fail() } } } // Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, @@ -604,12 +629,23 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { assert.NoError(b, err) defer p.Destroy(ctx) + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + + b.ResetTimer() + b.ReportAllocs() + var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(pld); err != nil { b.Fail() log.Println(err) } @@ -626,7 +662,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - Config{ + &Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -639,7 +675,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 4b990dbe..bdaeade1 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { panic("used to satisfy pool interface") } -func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } return res, nil @@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit /* worker at this point might be in the middle of request execution: - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release ^ TTL Reached, state - invalid | -----> Worker Stopped here @@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - // just to double check + // just to double-check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 1cd301ba..0702a71f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/require" ) -var cfgSupervised = Config{ +var cfgSupervised = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -84,7 +84,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.Error(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) + assert.Empty(t, resp) time.Sleep(time.Second * 1) // should be new worker with new pid @@ -125,7 +124,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { } func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), Supervisor: &SupervisorConfig{ WatchTick: 1 * time.Second, @@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) pid = p.Workers()[0].Pid() - resp, err = p.Exec(payload.Payload{ + resp, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -177,7 +176,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { } func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -225,7 +224,7 @@ func TestSupervisedPool_Idle(t *testing.T) { } func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -267,7 +266,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -309,7 +308,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { } func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = Config{ + var cfgExecTTL = &Config{ NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) |