summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
committerValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
commitad39466afb39fac49977cfb97f20f682f54bf35e (patch)
tree59b630ad0ef9b52eb13e0df5c8e41d92ce277950
parentee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (diff)
Move roadrunner payload out of internal to pkgv2.0.0-alpha25
-rw-r--r--interfaces/pool/pool.go6
-rw-r--r--interfaces/worker/worker.go5
-rwxr-xr-xpkg/payload/payload.go (renamed from internal/payload.go)2
-rwxr-xr-xpkg/pipe/pipe_factory_test.go27
-rwxr-xr-xpkg/pool/static_pool.go35
-rwxr-xr-xpkg/pool/static_pool_test.go39
-rwxr-xr-xpkg/pool/supervisor_pool.go15
-rw-r--r--pkg/pool/supervisor_test.go8
-rwxr-xr-xpkg/socket/socket_factory_test.go14
-rwxr-xr-xpkg/worker/sync_worker.go41
-rwxr-xr-xpkg/worker/sync_worker_test.go4
-rwxr-xr-xpkg/worker/worker.go4
-rw-r--r--plugins/http/request.go10
-rw-r--r--plugins/http/response.go4
-rw-r--r--plugins/http/tests/response_test.go16
-rw-r--r--plugins/server/tests/plugin_pipes.go4
-rw-r--r--plugins/server/tests/plugin_sockets.go4
-rw-r--r--plugins/server/tests/plugin_tcp.go4
18 files changed, 124 insertions, 118 deletions
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
index a1015fd6..72da9597 100644
--- a/interfaces/pool/pool.go
+++ b/interfaces/pool/pool.go
@@ -7,7 +7,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
// Pool managed set of inner worker processes.
@@ -19,9 +19,9 @@ type Pool interface {
GetConfig() interface{}
// Exec
- Exec(rqs internal.Payload) (internal.Payload, error)
+ Exec(rqs payload.Payload) (payload.Payload, error)
- ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error)
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
index edbc68d9..773dd044 100644
--- a/interfaces/worker/worker.go
+++ b/interfaces/worker/worker.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/goridge/v3"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
// Allocator is responsible for worker allocation in the pool
@@ -56,7 +57,7 @@ type SyncWorker interface {
// BaseProcess provides basic functionality for the SyncWorker
BaseProcess
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs internal.Payload) (internal.Payload, error)
+ Exec(rqs payload.Payload) (payload.Payload, error)
// ExecWithContext used to handle Exec with TTL
- ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error)
+ ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/internal/payload.go b/pkg/payload/payload.go
index 63983bad..fac36852 100755
--- a/internal/payload.go
+++ b/pkg/payload/payload.go
@@ -1,4 +1,4 @@
-package internal
+package payload
// Payload carries binary header and body to stack and
// back to the server.
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 99212ff8..9453de1d 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -151,7 +152,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -179,7 +180,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -228,7 +229,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -255,7 +256,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -282,7 +283,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -311,7 +312,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -342,7 +343,7 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(internal.Payload{})
+ res, err := syncWorker.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -391,7 +392,7 @@ func Test_Echo_Slow(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -425,7 +426,7 @@ func Test_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -460,7 +461,7 @@ func Test_Error(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -491,19 +492,19 @@ func Test_NumExecs(t *testing.T) {
t.Fatal(err)
}
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 6cc42143..9cf79fd4 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -9,7 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
// Before is set of functions that executes BEFORE Exec
-type Before func(req internal.Payload) internal.Payload
+type Before func(req payload.Payload) payload.Payload
// After is set of functions that executes AFTER Exec
-type After func(req internal.Payload, resp internal.Payload) internal.Payload
+type After func(req payload.Payload, resp payload.Payload) payload.Payload
type Options func(p *StaticPool)
@@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
cfg: cfg,
cmd: cmd,
factory: factory,
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
after: make([]After, 0, 0),
before: make([]Before, 0, 0),
}
@@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
-func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec with context")
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (internal.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.ErrSoftJob, err) {
@@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.ww.PushWorker(w)
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
w.State().Set(internal.StateInvalid)
@@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
errS := w.Stop(bCtx)
if errS != nil {
- return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
}
@@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc
}
}
-func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
r, err := sw.(worker.SyncWorker).Exec(p)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index dd33a1a6..b96e9214 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,6 +14,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -80,7 +81,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -104,7 +105,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -128,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -189,7 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
- res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
@@ -212,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -287,11 +288,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -323,14 +324,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,14 +364,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -400,7 +401,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -422,7 +423,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(internal.Payload{Body: []byte("100")})
+ _, err := p.Exec(payload.Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -430,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -456,7 +457,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(internal.StateErrored)
}
- _, err = p.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -494,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -520,7 +521,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -549,7 +550,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 6d1f0c58..6faa609c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
const MB = 1024 * 1024
@@ -42,10 +43,10 @@ func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig
type ttlExec struct {
err error
- p internal.Payload
+ p payload.Payload
}
-func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec_supervised")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -59,7 +60,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
- p: internal.Payload{},
+ p: payload.Payload{},
}
}
@@ -72,10 +73,10 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
for {
select {
case <-ctx.Done():
- return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.p, nil
@@ -83,11 +84,11 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
}
}
-func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("supervised exec")
rsp, err := sp.pool.Exec(p)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 2e3e7fd2..7dd423b8 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -61,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 50)
- _, err = p.Exec(internal.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -98,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), internal.Payload{
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -139,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(internal.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index f1a7d637..6a88713a 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -8,7 +8,7 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1eb1396e..daa07186 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
"github.com/spiral/goridge/v3"
@@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.w.State().Value() != internal.StateReady {
- return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
}
// set last used time
@@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
tw.w.State().Set(internal.StateReady)
@@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
}
type wexec struct {
- payload internal.Payload
+ payload payload.Payload
err error
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("ExecWithContext")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Str("payload can not be empty")),
}
return
@@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
if tw.w.State().Value() != internal.StateReady {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
}
return
@@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
tw.w.State().RegisterExec()
}
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, err),
}
return
@@ -113,18 +114,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
- return internal.Payload{}, multierr.Append(err, ctx.Err())
+ return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return internal.Payload{}, ctx.Err()
+ return payload.Payload{}, ctx.Err()
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.payload, nil
}
}
-func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec payload")
frame := goridge.NewFrame()
@@ -147,35 +148,35 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error)
err := tw.Relay().Send(frame)
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
frameR := goridge.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil frame received"))
}
if !frameR.VerifyCRC() {
- return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&byte(goridge.ERROR) != byte(0) {
- return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := internal.Payload{}
+ payload := payload.Payload{}
payload.Context = frameR.Payload()[:options[0]]
payload.Body = frameR.Payload()[options[0]:]
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index e224e105..40988b06 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -4,7 +4,7 @@ import (
"os/exec"
"testing"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/stretchr/testify/assert"
)
@@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 35d3264e..95fa6e06 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -17,7 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -88,7 +88,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
w := &Process{
created: time.Now(),
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
diff --git a/plugins/http/request.go b/plugins/http/request.go
index 5df79b7d..d613bcf6 100644
--- a/plugins/http/request.go
+++ b/plugins/http/request.go
@@ -10,7 +10,7 @@ import (
j "github.com/json-iterator/go"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
)
@@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) {
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
// files prior to calling this method.
-func (r *Request) Payload() (internal.Payload, error) {
- p := internal.Payload{}
+func (r *Request) Payload() (payload.Payload, error) {
+ p := payload.Payload{}
var err error
if p.Context, err = json.Marshal(r); err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
if r.Parsed {
if p.Body, err = json.Marshal(r.body); err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
} else if r.body != nil {
p.Body = r.body.([]byte)
diff --git a/plugins/http/response.go b/plugins/http/response.go
index 9700a16c..17049ce1 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -6,7 +6,7 @@ import (
"strings"
"sync"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
// Response handles PSR7 response logic.
@@ -23,7 +23,7 @@ type Response struct {
}
// NewResponse creates new response based on given pool payload.
-func NewResponse(p internal.Payload) (*Response, error) {
+func NewResponse(p payload.Payload) (*Response, error) {
r := &Response{Body: p.Body}
if err := json.Unmarshal(p.Context, r); err != nil {
return nil, err
diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go
index a526fe03..7901a0d1 100644
--- a/plugins/http/tests/response_test.go
+++ b/plugins/http/tests/response_test.go
@@ -6,7 +6,7 @@ import (
"net/http"
"testing"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
http2 "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/stretchr/testify/assert"
)
@@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)})
+ r, err := http2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := http2.NewResponse(internal.Payload{
+ r, err := http2.NewResponse(payload.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index 61c9a8f9..9a8a630c 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -7,7 +7,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -47,7 +47,7 @@ func (f *Foo) Serve() chan error {
const op = errors.Op("serve")
// test payload for echo
- r := internal.Payload{
+ r := payload.Payload{
Context: nil,
Body: []byte(Response),
}
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index 3b97efff..b1545718 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -6,7 +6,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
@@ -31,7 +31,7 @@ func (f *Foo2) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := internal.Payload{
+ r := payload.Payload{
Context: nil,
Body: []byte(Response),
}
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 2857dadc..da92288a 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -6,7 +6,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
@@ -31,7 +31,7 @@ func (f *Foo3) Serve() chan error {
conf := &plugin.Config{}
// test payload for echo
- r := internal.Payload{
+ r := payload.Payload{
Context: nil,
Body: []byte(Response),
}