summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
committerValery Piashchynski <[email protected]>2020-12-18 22:57:33 +0300
commitad39466afb39fac49977cfb97f20f682f54bf35e (patch)
tree59b630ad0ef9b52eb13e0df5c8e41d92ce277950 /pkg
parentee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (diff)
Move roadrunner payload out of internal to pkgv2.0.0-alpha25
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/payload/payload.go16
-rwxr-xr-xpkg/pipe/pipe_factory_test.go27
-rwxr-xr-xpkg/pool/static_pool.go35
-rwxr-xr-xpkg/pool/static_pool_test.go39
-rwxr-xr-xpkg/pool/supervisor_pool.go15
-rw-r--r--pkg/pool/supervisor_test.go8
-rwxr-xr-xpkg/socket/socket_factory_test.go14
-rwxr-xr-xpkg/worker/sync_worker.go41
-rwxr-xr-xpkg/worker/sync_worker_test.go4
-rwxr-xr-xpkg/worker/worker.go4
10 files changed, 112 insertions, 91 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
new file mode 100755
index 00000000..fac36852
--- /dev/null
+++ b/pkg/payload/payload.go
@@ -0,0 +1,16 @@
+package payload
+
+// Payload carries binary header and body to stack and
+// back to the server.
+type Payload struct {
+ // Context represent payload context, might be omitted.
+ Context []byte
+
+ // body contains binary payload to be processed by WorkerProcess.
+ Body []byte
+}
+
+// String returns payload body as string
+func (p *Payload) String() string {
+ return string(p.Body)
+}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 99212ff8..9453de1d 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -151,7 +152,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -179,7 +180,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -228,7 +229,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -255,7 +256,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -282,7 +283,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -311,7 +312,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -342,7 +343,7 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(internal.Payload{})
+ res, err := syncWorker.Exec(payload.Payload{})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -391,7 +392,7 @@ func Test_Echo_Slow(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -425,7 +426,7 @@ func Test_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -460,7 +461,7 @@ func Test_Error(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -491,19 +492,19 @@ func Test_NumExecs(t *testing.T) {
t.Fatal(err)
}
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 6cc42143..9cf79fd4 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -9,7 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
)
@@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// ErrorEncoder encode error or make a decision based on the error type
-type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
+type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
// Before is set of functions that executes BEFORE Exec
-type Before func(req internal.Payload) internal.Payload
+type Before func(req payload.Payload) payload.Payload
// After is set of functions that executes AFTER Exec
-type After func(req internal.Payload, resp internal.Payload) internal.Payload
+type After func(req payload.Payload, resp payload.Payload) payload.Payload
type Options func(p *StaticPool)
@@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
cfg: cfg,
cmd: cmd,
factory: factory,
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
after: make([]After, 0, 0),
before: make([]Before, 0, 0),
}
@@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
-func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
@@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec with context")
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
sw := w.(worker.SyncWorker)
@@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload)
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
} else {
sp.ww.PushWorker(sw)
@@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
}
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (internal.Payload, error) {
+ return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
if errors.Is(errors.ErrSoftJob, err) {
@@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.ww.PushWorker(w)
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
w.State().Set(internal.StateInvalid)
@@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
errS := w.Stop(bCtx)
if errS != nil {
- return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
}
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
}
@@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc
}
}
-func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
sw, err := sp.allocator()
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
r, err := sw.(worker.SyncWorker).Exec(p)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index dd33a1a6..b96e9214 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,6 +14,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -80,7 +81,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -104,7 +105,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -128,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -151,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -189,7 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
- res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
@@ -212,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -287,11 +288,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -323,14 +324,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
assert.Len(t, p.Workers(), 0)
var lastPID string
- res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NotEqual(t, lastPID, string(res.Body))
assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
assert.Len(t, p.Workers(), 0)
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,14 +364,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -400,7 +401,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -422,7 +423,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(internal.Payload{Body: []byte("100")})
+ _, err := p.Exec(payload.Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -430,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -456,7 +457,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(internal.StateErrored)
}
- _, err = p.Exec(internal.Payload{Body: []byte("hello")})
+ _, err = p.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -494,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -520,7 +521,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -549,7 +550,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 6d1f0c58..6faa609c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
)
const MB = 1024 * 1024
@@ -42,10 +43,10 @@ func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig
type ttlExec struct {
err error
- p internal.Payload
+ p payload.Payload
}
-func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec_supervised")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -59,7 +60,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
- p: internal.Payload{},
+ p: payload.Payload{},
}
}
@@ -72,10 +73,10 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
for {
select {
case <-ctx.Done():
- return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.p, nil
@@ -83,11 +84,11 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload)
}
}
-func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
+func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("supervised exec")
rsp, err := sp.pool.Exec(p)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 2e3e7fd2..7dd423b8 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pipe"
"github.com/stretchr/testify/assert"
)
@@ -61,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 50)
- _, err = p.Exec(internal.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -98,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), internal.Payload{
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -139,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
pid := p.Workers()[0].Pid()
time.Sleep(time.Millisecond * 100)
- resp, err := p.Exec(internal.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index f1a7d637..6a88713a 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -8,7 +8,7 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1eb1396e..daa07186 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
"github.com/spiral/goridge/v3"
@@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
+ return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.w.State().Value() != internal.StateReady {
- return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
+ return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String()))
}
// set last used time
@@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
tw.w.State().Set(internal.StateReady)
@@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) {
}
type wexec struct {
- payload internal.Payload
+ payload payload.Payload
err error
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("ExecWithContext")
c := make(chan wexec, 1)
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Str("payload can not be empty")),
}
return
@@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
if tw.w.State().Value() != internal.StateReady {
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())),
}
return
@@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
tw.w.State().RegisterExec()
}
c <- wexec{
- payload: internal.Payload{},
+ payload: payload.Payload{},
err: errors.E(op, err),
}
return
@@ -113,18 +114,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
- return internal.Payload{}, multierr.Append(err, ctx.Err())
+ return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return internal.Payload{}, ctx.Err()
+ return payload.Payload{}, ctx.Err()
case res := <-c:
if res.err != nil {
- return internal.Payload{}, res.err
+ return payload.Payload{}, res.err
}
return res.payload, nil
}
}
-func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) {
+func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("exec payload")
frame := goridge.NewFrame()
@@ -147,35 +148,35 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error)
err := tw.Relay().Send(frame)
if err != nil {
- return internal.Payload{}, err
+ return payload.Payload{}, err
}
frameR := goridge.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
- return internal.Payload{}, errors.E(op, err)
+ return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return internal.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil frame received"))
}
if !frameR.VerifyCRC() {
- return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
+ return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC"))
}
flags := frameR.ReadFlags()
if flags&byte(goridge.ERROR) != byte(0) {
- return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
if len(options) != 1 {
- return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
+ return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := internal.Payload{}
+ payload := payload.Payload{}
payload.Context = frameR.Payload()[:options[0]]
payload.Body = frameR.Payload()[options[0]:]
diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go
index e224e105..40988b06 100755
--- a/pkg/worker/sync_worker_test.go
+++ b/pkg/worker/sync_worker_test.go
@@ -4,7 +4,7 @@ import (
"os/exec"
"testing"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/stretchr/testify/assert"
)
@@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 35d3264e..95fa6e06 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -17,7 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- events2 "github.com/spiral/roadrunner/v2/pkg/events"
+ eventsPkg "github.com/spiral/roadrunner/v2/pkg/events"
"go.uber.org/multierr"
)
@@ -88,7 +88,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
w := &Process{
created: time.Now(),
- events: events2.NewEventsHandler(),
+ events: eventsPkg.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),