summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
committerValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
commitfedf012e632a31d2d0837c22832c7683547ad379 (patch)
treebcb5634dfacccc6d34e49aa7337ac8d1f18b693c /pkg
parent609e61426b137834ac589c88f1124574f939fa67 (diff)
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/interface.go4
-rwxr-xr-xpkg/pool/static_pool.go24
-rwxr-xr-xpkg/pool/static_pool_test.go51
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rw-r--r--pkg/pool/supervisor_test.go23
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go38
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go38
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go18
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go18
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/sync_worker.go43
-rwxr-xr-xpkg/worker/sync_worker_test.go5
-rw-r--r--pkg/worker_handler/request.go8
-rw-r--r--pkg/worker_handler/response.go2
14 files changed, 131 insertions, 151 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index a040d27a..4049122c 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -13,7 +13,7 @@ type Pool interface {
GetConfig() interface{}
// Exec executes task with payload
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
@@ -25,7 +25,7 @@ type Pool interface {
Destroy(ctx context.Context)
// ExecWithContext executes task with context which is used with timeout
- execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+ execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
}
// Watcher is an interface for the Sync workers lifecycle
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 037294ea..5990f929 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
type Options func(p *StaticPool)
@@ -135,7 +135,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
}
// Exec executes provided payload on the worker
-func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -144,7 +144,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
defer cancel()
w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).Exec(p)
@@ -168,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
// Be careful, sync with pool.Exec method
-func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
if sp.cfg.Debug {
return sp.execDebugWithTTL(ctx, p)
@@ -178,7 +178,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
defer cancel()
w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
@@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
@@ -273,10 +273,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
if errS != nil {
- return payload.Payload{}, errors.E(op, err, errS)
+ return nil, errors.E(op, err, errS)
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
}
@@ -300,10 +300,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
}
// execDebug used when debug mode was not set and exec_ttl is 0
-func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the workers exec method (without ttl)
@@ -316,10 +316,10 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// execDebugWithTTL used when user set debug mode and exec_ttl
-func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the worker with TTL
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 3df773ab..c57b1683 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
@@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 100)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p.Workers()[i].State().Set(worker.StateErrored)
}
- _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.NotNil(t, p)
go func() {
- _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
}()
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -582,7 +579,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -609,7 +606,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -639,7 +636,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index aa6c7cfa..cbb7ad7b 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
panic("used to satisfy pool interface")
}
-func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
return res, nil
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 234eec3f..0702a71f 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
assert.Error(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
+ assert.Empty(t, resp)
time.Sleep(time.Second * 1)
// should be new worker with new pid
@@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()
- resp, err = p.Exec(payload.Payload{
+ resp, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 51befb1e..f5e9669b 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
@@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 3ef65be8..5c937a97 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -159,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -184,11 +184,10 @@ func Test_Pipe_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
@@ -231,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -255,7 +254,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -279,7 +278,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -305,7 +304,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -333,11 +332,10 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -379,7 +377,7 @@ func Test_Echo_Slow(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -409,10 +407,9 @@ func Test_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -441,10 +438,9 @@ func Test_Error(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -469,19 +465,19 @@ func Test_NumExecs(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index b875e2c8..23506291 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -194,10 +194,9 @@ func Test_Tcp_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 34fe088b..91da595d 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
wg.Wait()
}
@@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index d2cfe2cd..ed8704bb 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -68,7 +68,7 @@ type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 380bfff7..d20b7ae0 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -36,14 +36,14 @@ func From(process *Process) *SyncWorkerImpl {
}
// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.process.State().Value() != StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
@@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
// supervisor may set state of the worker during the work
@@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
}
type wexec struct {
- payload payload.Payload
+ payload *payload.Payload
err error
}
// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Str("payload can not be empty")),
+ err: errors.E(op, errors.Str("payload can not be empty")),
}
return
}
if tw.process.State().Value() != StateReady {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
@@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
tw.process.State().RegisterExec()
}
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, err),
+ err: errors.E(op, err),
}
return
}
@@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
if err != nil {
// append timeout error
err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return payload.Payload{}, multierr.Append(err, ctx.Err())
+ return nil, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
- return payload.Payload{}, res.err
+ return nil, res.err
}
return res.payload, nil
}
}
-func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
// get a frame
@@ -182,7 +179,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err := tw.Relay().Send(fr)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
frameR := tw.getFrame()
@@ -190,28 +187,28 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err = tw.process.Relay().Receive(frameR)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&frame.ERROR != byte(0) {
- return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pld := payload.Payload{
+ pld := &payload.Payload{
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index df556e93..64580f9f 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) {
sw := From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "Process is not ready (inactive)")
}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
index 44c466bb..3d60897b 100644
--- a/pkg/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
@@ -138,18 +138,18 @@ func (r *Request) Close(log logger.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (payload.Payload, error) {
+func (r *Request) Payload() (*payload.Payload, error) {
const op = errors.Op("marshal_payload")
- p := payload.Payload{}
+ p := &payload.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
index cbf22794..d22f09d4 100644
--- a/pkg/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
@@ -22,7 +22,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p payload.Payload) (*Response, error) {
+func NewResponse(p *payload.Payload) (*Response, error) {
const op = errors.Op("http_response")
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {