summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
committerValery Piashchynski <[email protected]>2021-07-22 14:26:13 +0300
commitfedf012e632a31d2d0837c22832c7683547ad379 (patch)
treebcb5634dfacccc6d34e49aa7337ac8d1f18b693c
parent609e61426b137834ac589c88f1124574f939fa67 (diff)
BC for the Pool, worker interfaces, pass/return payload by pointer
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--CHANGELOG.md22
-rw-r--r--pkg/pool/interface.go4
-rwxr-xr-xpkg/pool/static_pool.go24
-rwxr-xr-xpkg/pool/static_pool_test.go51
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rw-r--r--pkg/pool/supervisor_test.go23
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go38
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go38
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go18
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go18
-rw-r--r--pkg/worker/interface.go4
-rwxr-xr-xpkg/worker/sync_worker.go43
-rwxr-xr-xpkg/worker/sync_worker_test.go5
-rw-r--r--pkg/worker_handler/request.go8
-rw-r--r--pkg/worker_handler/response.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go3
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
-rw-r--r--plugins/jobs/plugin.go8
-rw-r--r--plugins/websockets/plugin.go2
-rw-r--r--tests/plugins/http/response_test.go14
-rw-r--r--tests/plugins/server/plugin_pipes.go2
-rw-r--r--tests/plugins/server/plugin_sockets.go2
-rw-r--r--tests/plugins/server/plugin_tcp.go2
-rw-r--r--tests/plugins/service/placeholder.go1
24 files changed, 170 insertions, 170 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5fa1abfd..8000a622 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,28 @@
CHANGELOG
=========
+v2.4.0 (_.08.2021)
+-------------------
+
+## 💔 Internal BC:
+
+- 🔨 Pool, worker interfaces: payload now passed and returned by pointer.
+
+## 👀 New:
+
+- ✏️ Long awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime.
+ Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral`. All jobs can be prioritized now --> [PR](https://github.com/spiral/roadrunner/pull/726)
+
+## 🩹 Fixes:
+
+- 🐛 Fix:
+
+## 📈 Summary:
+
+- RR Milestone [2.4.0]()
+
+---
+
v2.3.2 (14.07.2021)
-------------------
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index a040d27a..4049122c 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -13,7 +13,7 @@ type Pool interface {
GetConfig() interface{}
// Exec executes task with payload
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
@@ -25,7 +25,7 @@ type Pool interface {
Destroy(ctx context.Context)
// ExecWithContext executes task with context which is used with timeout
- execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+ execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
}
// Watcher is an interface for the Sync workers lifecycle
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 037294ea..5990f929 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -18,7 +18,7 @@ import (
const StopRequest = "{\"stop\":true}"
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
type Options func(p *StaticPool)
@@ -135,7 +135,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
}
// Exec executes provided payload on the worker
-func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -144,7 +144,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
defer cancel()
w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).Exec(p)
@@ -168,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
// Be careful, sync with pool.Exec method
-func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
if sp.cfg.Debug {
return sp.execDebugWithTTL(ctx, p)
@@ -178,7 +178,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
defer cancel()
w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
@@ -244,7 +244,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (payload.Payload, error) {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
@@ -273,10 +273,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
if errS != nil {
- return payload.Payload{}, errors.E(op, err, errS)
+ return nil, errors.E(op, err, errS)
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
}
@@ -300,10 +300,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
}
// execDebug used when debug mode was not set and exec_ttl is 0
-func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the workers exec method (without ttl)
@@ -316,10 +316,10 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// execDebugWithTTL used when user set debug mode and exec_ttl
-func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return payload.Payload{}, err
+ return nil, err
}
// redirect call to the worker with TTL
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 3df773ab..c57b1683 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -82,7 +82,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -106,7 +106,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -130,7 +130,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,11 +151,10 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
@@ -192,10 +191,9 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -232,7 +230,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -298,11 +296,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -334,14 +332,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -374,14 +372,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -411,7 +409,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -433,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -441,7 +439,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 100)
p.Destroy(ctx)
- _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -465,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p.Workers()[i].State().Set(worker.StateErrored)
}
- _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
p.Destroy(ctx)
}
@@ -519,14 +517,13 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.NotNil(t, p)
go func() {
- _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
}()
time.Sleep(time.Second)
- res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
@@ -582,7 +579,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -609,7 +606,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -639,7 +636,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index aa6c7cfa..cbb7ad7b 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,11 +43,11 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
panic("used to satisfy pool interface")
}
-func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
return res, nil
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 234eec3f..0702a71f 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -43,7 +43,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -73,7 +73,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -110,14 +110,13 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
assert.Error(t, err)
- assert.Empty(t, resp.Body)
- assert.Empty(t, resp.Context)
+ assert.Empty(t, resp)
time.Sleep(time.Second * 1)
// should be new worker with new pid
@@ -145,7 +144,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -159,7 +158,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
pid = p.Workers()[0].Pid()
- resp, err = p.Exec(payload.Payload{
+ resp, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -202,7 +201,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -214,7 +213,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.Exec(payload.Payload{
+ _, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -251,7 +250,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -294,7 +293,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -346,7 +345,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- resp, err := p.Exec(payload.Payload{
+ resp, err := p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 51befb1e..f5e9669b 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
@@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 3ef65be8..5c937a97 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -159,7 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -184,11 +184,10 @@ func Test_Pipe_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
@@ -231,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -255,7 +254,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -279,7 +278,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -305,7 +304,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -333,11 +332,10 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := sw.Exec(payload.Payload{})
+ res, err := sw.Exec(&payload.Payload{})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "payload can not be empty")
}
@@ -379,7 +377,7 @@ func Test_Echo_Slow(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -409,10 +407,9 @@ func Test_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
time.Sleep(time.Second * 3)
mu.Lock()
@@ -441,10 +438,9 @@ func Test_Error(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
@@ -469,19 +465,19 @@ func Test_NumExecs(t *testing.T) {
sw := worker.From(w)
- _, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(1), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, uint64(2), w.State().NumExecs())
- _, err = sw.Exec(payload.Payload{Body: []byte("hello")})
+ _, err = sw.Exec(&payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index b875e2c8..23506291 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -194,10 +194,9 @@ func Test_Tcp_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 34fe088b..91da595d 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
wg.Wait()
}
@@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index d2cfe2cd..ed8704bb 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -68,7 +68,7 @@ type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs payload.Payload) (payload.Payload, error)
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
// ExecWithTTL used to handle Exec with TTL
- ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 380bfff7..d20b7ae0 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -36,14 +36,14 @@ func From(process *Process) *SyncWorkerImpl {
}
// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) Exec(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return nil, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.process.State().Value() != StateReady {
- return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
+ return nil, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
@@ -57,7 +57,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
- return payload.Payload{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
// supervisor may set state of the worker during the work
@@ -74,28 +74,26 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
}
type wexec struct {
- payload payload.Payload
+ payload *payload.Payload
err error
}
// ExecWithTTL executes payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Str("payload can not be empty")),
+ err: errors.E(op, errors.Str("payload can not be empty")),
}
return
}
if tw.process.State().Value() != StateReady {
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
+ err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
}
return
}
@@ -112,8 +110,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
tw.process.State().RegisterExec()
}
c <- wexec{
- payload: payload.Payload{},
- err: errors.E(op, err),
+ err: errors.E(op, err),
}
return
}
@@ -143,18 +140,18 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
if err != nil {
// append timeout error
err = multierr.Append(err, errors.E(op, errors.ExecTTL))
- return payload.Payload{}, multierr.Append(err, ctx.Err())
+ return nil, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
+ return nil, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
- return payload.Payload{}, res.err
+ return nil, res.err
}
return res.payload, nil
}
}
-func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) execPayload(p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("sync_worker_exec_payload")
// get a frame
@@ -182,7 +179,7 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err := tw.Relay().Send(fr)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
frameR := tw.getFrame()
@@ -190,28 +187,28 @@ func (tw *SyncWorkerImpl) execPayload(p payload.Payload) (payload.Payload, error
err = tw.process.Relay().Receive(frameR)
if err != nil {
- return payload.Payload{}, errors.E(op, errors.Network, err)
+ return nil, errors.E(op, errors.Network, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("nil fr received"))
+ return nil, errors.E(op, errors.Network, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
- return payload.Payload{}, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
+ return nil, errors.E(op, errors.Network, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&frame.ERROR != byte(0) {
- return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
+ return nil, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return payload.Payload{}, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
+ return nil, errors.E(op, errors.Decode, errors.Str("options length should be equal 1 (body offset)"))
}
- pld := payload.Payload{
+ pld := &payload.Payload{
Body: make([]byte, len(frameR.Payload()[options[0]:])),
Context: make([]byte, len(frameR.Payload()[:options[0]])),
}
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index df556e93..64580f9f 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -24,11 +24,10 @@ func Test_NotStarted_Exec(t *testing.T) {
sw := From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
assert.Contains(t, err.Error(), "Process is not ready (inactive)")
}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
index 44c466bb..3d60897b 100644
--- a/pkg/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
@@ -138,18 +138,18 @@ func (r *Request) Close(log logger.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (payload.Payload, error) {
+func (r *Request) Payload() (*payload.Payload, error) {
const op = errors.Op("marshal_payload")
- p := payload.Payload{}
+ p := &payload.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, errors.E(op, errors.Encode, err)
+ return nil, errors.E(op, errors.Encode, err)
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
index cbf22794..d22f09d4 100644
--- a/pkg/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
@@ -22,7 +22,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p payload.Payload) (*Response, error) {
+func NewResponse(p *payload.Payload) (*Response, error) {
const op = errors.Op("http_response")
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index fc659902..6cc50c07 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -167,8 +167,7 @@ var connErrors = map[string]struct{}{"EOF": {}}
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
- switch et := err.(type) {
-
+ switch et := err.(type) { //nolint:gocritic
// check if the error
case beanstalk.ConnError:
switch bErr := et.Err.(type) {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index 0f98312a..3e9061a3 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -12,7 +12,7 @@ func (j *JobConsumer) listen() {
id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
if errB, ok := err.(beanstalk.ConnError); ok {
- switch errB.Err {
+ switch errB.Err { //nolint:gocritic
case beanstalk.ErrTimeout:
j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
continue
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index c8973f1e..219799b8 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -83,7 +83,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.stopCh = make(chan struct{}, 1)
p.pldPool = sync.Pool{New: func() interface{} {
// with nil fields
- return payload.Payload{}
+ return &payload.Payload{}
}}
// initial set of pipelines
@@ -104,11 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload() payload.Payload {
- return p.pldPool.Get().(payload.Payload)
+func (p *Plugin) getPayload() *payload.Payload {
+ return p.pldPool.Get().(*payload.Payload)
}
-func (p *Plugin) putPayload(pld payload.Payload) {
+func (p *Plugin) putPayload(pld *payload.Payload) {
pld.Body = nil
pld.Context = nil
p.pldPool.Put(pld)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 6c119e57..2df23f11 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -337,7 +337,7 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid
func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
const op = errors.Op("exec")
- pd := payload.Payload{
+ pd := &payload.Payload{
Context: ctx,
}
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 276c22ef..f754429d 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)})
+ r, err := handler.NewResponse(&payload.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler.NewResponse(&payload.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index e813e456..d136da1e 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -45,7 +45,7 @@ func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
- r := payload.Payload{
+ r := &payload.Payload{
Context: nil,
Body: []byte(Response),
}
diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go
index 0b2857e3..143a604c 100644
--- a/tests/plugins/server/plugin_sockets.go
+++ b/tests/plugins/server/plugin_sockets.go
@@ -30,7 +30,7 @@ func (f *Foo2) Serve() chan error {
conf := &server.Config{}
// test payload for echo
- r := payload.Payload{
+ r := &payload.Payload{
Context: nil,
Body: []byte(Response),
}
diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go
index ef4cea39..57a2e6ea 100644
--- a/tests/plugins/server/plugin_tcp.go
+++ b/tests/plugins/server/plugin_tcp.go
@@ -30,7 +30,7 @@ func (f *Foo3) Serve() chan error {
conf := &server.Config{}
// test payload for echo
- r := payload.Payload{
+ r := &payload.Payload{
Context: nil,
Body: []byte(Response),
}
diff --git a/tests/plugins/service/placeholder.go b/tests/plugins/service/placeholder.go
deleted file mode 100644
index 6d43c336..00000000
--- a/tests/plugins/service/placeholder.go
+++ /dev/null
@@ -1 +0,0 @@
-package service