diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
commit | fedf012e632a31d2d0837c22832c7683547ad379 (patch) | |
tree | bcb5634dfacccc6d34e49aa7337ac8d1f18b693c /pkg/pool | |
parent | 609e61426b137834ac589c88f1124574f939fa67 (diff) |
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool')
-rw-r--r-- | pkg/pool/interface.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 24 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 51 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 6 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 23 |
5 files changed, 52 insertions, 56 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index a040d27a..4049122c 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -13,7 +13,7 @@ type Pool interface { GetConfig() interface{} // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -25,7 +25,7 @@ type Pool interface { Destroy(ctx context.Context) // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) } // Watcher is an interface for the Sync workers lifecycle diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 037294ea..5990f929 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) type Options func(p *StaticPool) @@ -135,7 +135,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { } // Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -144,7 +144,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { defer cancel() w, err := sp.takeWorker(ctxGetFree, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).Exec(p) @@ -168,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } // Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") if sp.cfg.Debug { return sp.execDebugWithTTL(ctx, p) @@ -178,7 +178,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo defer cancel() w, err := sp.takeWorker(ctxAlloc, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) @@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { @@ -273,10 +273,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() if errS != nil { - return payload.Payload{}, errors.E(op, err, errS) + return nil, errors.E(op, err, errS) } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } } @@ -300,10 +300,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio } // execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the workers exec method (without ttl) @@ -316,10 +316,10 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the worker with TTL diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 3df773ab..c57b1683 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.Exec") @@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, errP := p.Exec(payload.Payload{Body: []byte("100")}) + _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) if errP != nil { t.Errorf("error executing payload: error %v", err) } @@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 100) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p.Workers()[i].State().Set(worker.StateErrored) } - _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) p.Destroy(ctx) } @@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -582,7 +579,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -609,7 +606,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -639,7 +636,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index aa6c7cfa..cbb7ad7b 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { panic("used to satisfy pool interface") } -func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } return res, nil diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 234eec3f..0702a71f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.Error(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) + assert.Empty(t, resp) time.Sleep(time.Second * 1) // should be new worker with new pid @@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) pid = p.Workers()[0].Pid() - resp, err = p.Exec(payload.Payload{ + resp, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) |