diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 14:26:13 +0300 |
commit | fedf012e632a31d2d0837c22832c7683547ad379 (patch) | |
tree | bcb5634dfacccc6d34e49aa7337ac8d1f18b693c | |
parent | 609e61426b137834ac589c88f1124574f939fa67 (diff) |
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
24 files changed, 170 insertions, 170 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fa1abfd..8000a622 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,28 @@ CHANGELOG ========= +v2.4.0 (_.08.2021) +------------------- + +## 💔 Internal BC: + +- 🔨 Pool, worker interfaces: payload now passed and returned by pointer. + +## 👀 New: + +- ✏️ Long awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. + Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral`. All jobs can be prioritized now --> [PR](https://github.com/spiral/roadrunner/pull/726) + +## 🩹 Fixes: + +- 🐛 Fix: + +## 📈 Summary: + +- RR Milestone [2.4.0]() + +--- + v2.3.2 (14.07.2021) ------------------- diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index a040d27a..4049122c 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -13,7 +13,7 @@ type Pool interface { GetConfig() interface{} // Exec executes task with payload - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -25,7 +25,7 @@ type Pool interface { Destroy(ctx context.Context) // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) } // Watcher is an interface for the Sync workers lifecycle diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 037294ea..5990f929 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -18,7 +18,7 @@ import ( const StopRequest = "{\"stop\":true}" // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) type Options func(p *StaticPool) @@ -135,7 +135,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { } // Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -144,7 +144,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { defer cancel() w, err := sp.takeWorker(ctxGetFree, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).Exec(p) @@ -168,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } // Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") if sp.cfg.Debug { return sp.execDebugWithTTL(ctx, p) @@ -178,7 +178,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo defer cancel() w, err := sp.takeWorker(ctxAlloc, op) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) @@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (payload.Payload, error) { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { @@ -273,10 +273,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() if errS != nil { - return payload.Payload{}, errors.E(op, err, errS) + return nil, errors.E(op, err, errS) } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } } @@ -300,10 +300,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio } // execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the workers exec method (without ttl) @@ -316,10 +316,10 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return payload.Payload{}, err + return nil, err } // redirect call to the worker with TTL diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 3df773ab..c57b1683 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.Exec") @@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, errP := p.Exec(payload.Payload{Body: []byte("100")}) + _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) if errP != nil { t.Errorf("error executing payload: error %v", err) } @@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 100) p.Destroy(ctx) - _, err = p.Exec(payload.Payload{Body: []byte("100")}) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p.Workers()[i].State().Set(worker.StateErrored) } - _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) p.Destroy(ctx) } @@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block @@ -582,7 +579,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -609,7 +606,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -639,7 +636,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index aa6c7cfa..cbb7ad7b 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { panic("used to satisfy pool interface") } -func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } return res, nil diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 234eec3f..0702a71f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) assert.Error(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) + assert.Empty(t, resp) time.Sleep(time.Second * 1) // should be new worker with new pid @@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) pid = p.Workers()[0].Pid() - resp, err = p.Exec(payload.Payload{ + resp, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(payload.Payload{ + _, err = p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - resp, err := p.Exec(payload.Payload{ + resp, err := p.Exec(&payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 51befb1e..f5e9669b 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { @@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 3ef65be8..5c937a97 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -159,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -184,11 +184,10 @@ func Test_Pipe_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { @@ -231,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -255,7 +254,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -279,7 +278,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -305,7 +304,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -333,11 +332,10 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -379,7 +377,7 @@ func Test_Echo_Slow(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -409,10 +407,9 @@ func Test_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -441,10 +438,9 @@ func Test_Error(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -469,19 +465,19 @@ func Test_NumExecs(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index b875e2c8..23506291 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -194,10 +194,9 @@ func Test_Tcp_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } @@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) wg.Wait() <-finish } @@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index 34fe088b..91da595d 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } @@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block wg.Wait() } @@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index d2cfe2cd..ed8704bb 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -68,7 +68,7 @@ type SyncWorker interface { // BaseProcess provides basic functionality for the SyncWorker BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs payload.Payload) (payload.Payload, error) + Exec(rqs *payload.Payload) (*payload.Payload, error) // ExecWithTTL used to handle Exec with TTL - ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 380bfff7..d20b7ae0 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -36,14 +36,14 @@ func From(process *Process) *SyncWorkerImpl { } // Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return nil, errors.E(op, errors.Str("payload can not be empty")) } if tw.process.State().Value() != StateReady { - return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) + return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time @@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } - return payload.Payload{}, errors.E(op, err) + return nil, errors.E(op, err) } // supervisor may set state of the worker during the work @@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { } type wexec struct { - payload payload.Payload + payload *payload.Payload err error } // ExecWithTTL executes payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Str("payload can not be empty")), + err: errors.E(op, errors.Str("payload can not be empty")), } return } if tw.process.State().Value() != StateReady { c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), + err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), } return } @@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p tw.process.State().RegisterExec() } c <- wexec{ - payload: payload.Payload{}, - err: errors.E(op, err), + err: errors.E(op, err), } return } @@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p if err != nil { // append timeout error err = multierr.Append(err, errors.E(op, errors.ExecTTL)) - return payload.Payload{}, multierr.Append(err, ctx.Err()) + return nil, multierr.Append(err, ctx.Err()) } - return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) + return nil, errors.E(op, errors.ExecTTL, ctx.Err()) case res := <-c: if res.err != nil { - return payload.Payload{}, res.err + return nil, res.err } return res.payload, nil } } -func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("sync_worker_exec_payload") // get a frame @@ -182,7 +179,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err := tw.Relay().Send(fr) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } frameR := tw.getFrame() @@ -190,28 +187,28 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error err = tw.process.Relay().Receive(frameR) if err != nil { - return payload.Payload{}, errors.E(op, errors.Network, err) + return nil, errors.E(op, errors.Network, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received")) + return nil, errors.E(op, errors.Network, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { - return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) + return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&frame.ERROR != byte(0) { - return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) + return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) + return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)")) } - pld := payload.Payload{ + pld := &payload.Payload{ Body: make([]byte, len(frameR.Payload()[options[0]:])), Context: make([]byte, len(frameR.Payload()[:options[0]])), } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index df556e93..64580f9f 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) { sw := From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "Process is not ready (inactive)") } diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go index 44c466bb..3d60897b 100644 --- a/pkg/worker_handler/request.go +++ b/pkg/worker_handler/request.go @@ -138,18 +138,18 @@ func (r *Request) Close(log logger.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (payload.Payload, error) { +func (r *Request) Payload() (*payload.Payload, error) { const op = errors.Op("marshal_payload") - p := payload.Payload{} + p := &payload.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return payload.Payload{}, errors.E(op, errors.Encode, err) + return nil, errors.E(op, errors.Encode, err) } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return payload.Payload{}, errors.E(op, errors.Encode, err) + return nil, errors.E(op, errors.Encode, err) } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go index cbf22794..d22f09d4 100644 --- a/pkg/worker_handler/response.go +++ b/pkg/worker_handler/response.go @@ -22,7 +22,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p payload.Payload) (*Response, error) { +func NewResponse(p *payload.Payload) (*Response, error) { const op = errors.Op("http_response") r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index fc659902..6cc50c07 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -167,8 +167,7 @@ var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") - switch et := err.(type) { - + switch et := err.(type) { //nolint:gocritic // check if the error case beanstalk.ConnError: switch bErr := et.Err.(type) { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 0f98312a..3e9061a3 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -12,7 +12,7 @@ func (j *JobConsumer) listen() { id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { if errB, ok := err.(beanstalk.ConnError); ok { - switch errB.Err { + switch errB.Err { //nolint:gocritic case beanstalk.ErrTimeout: j.log.Info("beanstalk reserve timeout", "warn", errB.Op) continue diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c8973f1e..219799b8 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -83,7 +83,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.stopCh = make(chan struct{}, 1) p.pldPool = sync.Pool{New: func() interface{} { // with nil fields - return payload.Payload{} + return &payload.Payload{} }} // initial set of pipelines @@ -104,11 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload() payload.Payload { - return p.pldPool.Get().(payload.Payload) +func (p *Plugin) getPayload() *payload.Payload { + return p.pldPool.Get().(*payload.Payload) } -func (p *Plugin) putPayload(pld payload.Payload) { +func (p *Plugin) putPayload(pld *payload.Payload) { pld.Body = nil pld.Context = nil p.pldPool.Put(pld) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 6c119e57..2df23f11 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -337,7 +337,7 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { const op = errors.Op("exec") - pd := payload.Payload{ + pd := &payload.Payload{ Context: ctx, } diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 276c22ef..f754429d 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) + r, err := handler.NewResponse(&payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index e813e456..d136da1e 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -45,7 +45,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go index 0b2857e3..143a604c 100644 --- a/tests/plugins/server/plugin_sockets.go +++ b/tests/plugins/server/plugin_sockets.go @@ -30,7 +30,7 @@ func (f *Foo2) Serve() chan error { conf := &server.Config{} // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go index ef4cea39..57a2e6ea 100644 --- a/tests/plugins/server/plugin_tcp.go +++ b/tests/plugins/server/plugin_tcp.go @@ -30,7 +30,7 @@ func (f *Foo3) Serve() chan error { conf := &server.Config{} // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/service/placeholder.go b/tests/plugins/service/placeholder.go deleted file mode 100644 index 6d43c336..00000000 --- a/tests/plugins/service/placeholder.go +++ /dev/null @@ -1 +0,0 @@ -package service |