diff options
author | Valery Piashchynski <[email protected]> | 2020-12-18 22:57:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-18 22:57:33 +0300 |
commit | ad39466afb39fac49977cfb97f20f682f54bf35e (patch) | |
tree | 59b630ad0ef9b52eb13e0df5c8e41d92ce277950 | |
parent | ee0cb478c74c393a35155c2bf51e1ef260e0e5e2 (diff) |
Move roadrunner payload out of internal to pkgv2.0.0-alpha25
-rw-r--r-- | interfaces/pool/pool.go | 6 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 5 | ||||
-rwxr-xr-x | pkg/payload/payload.go (renamed from internal/payload.go) | 2 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 27 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 35 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 39 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 15 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 8 | ||||
-rwxr-xr-x | pkg/socket/socket_factory_test.go | 14 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 41 | ||||
-rwxr-xr-x | pkg/worker/sync_worker_test.go | 4 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 4 | ||||
-rw-r--r-- | plugins/http/request.go | 10 | ||||
-rw-r--r-- | plugins/http/response.go | 4 | ||||
-rw-r--r-- | plugins/http/tests/response_test.go | 16 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 4 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go | 4 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go | 4 |
18 files changed, 124 insertions, 118 deletions
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go index a1015fd6..72da9597 100644 --- a/interfaces/pool/pool.go +++ b/interfaces/pool/pool.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Pool managed set of inner worker processes. @@ -19,9 +19,9 @@ type Pool interface { GetConfig() interface{} // Exec - Exec(rqs internal.Payload) (internal.Payload, error) + Exec(rqs payload.Payload) (payload.Payload, error) - ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index edbc68d9..773dd044 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/goridge/v3" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Allocator is responsible for worker allocation in the pool @@ -56,7 +57,7 @@ type SyncWorker interface { // BaseProcess provides basic functionality for the SyncWorker BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs internal.Payload) (internal.Payload, error) + Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) + ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/internal/payload.go b/pkg/payload/payload.go index 63983bad..fac36852 100755 --- a/internal/payload.go +++ b/pkg/payload/payload.go @@ -1,4 +1,4 @@ -package internal +package payload // Payload carries binary header and body to stack and // back to the server. diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 99212ff8..9453de1d 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -151,7 +152,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -179,7 +180,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -228,7 +229,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -255,7 +256,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -282,7 +283,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -311,7 +312,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -342,7 +343,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.Exec(internal.Payload{}) + res, err := syncWorker.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -391,7 +392,7 @@ func Test_Echo_Slow(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -425,7 +426,7 @@ func Test_Broken(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -460,7 +461,7 @@ func Test_Error(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -491,19 +492,19 @@ func Test_NumExecs(t *testing.T) { t.Fatal(err) } - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 6cc42143..9cf79fd4 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -9,7 +9,8 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - events2 "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) // Before is set of functions that executes BEFORE Exec -type Before func(req internal.Payload) internal.Payload +type Before func(req payload.Payload) payload.Payload // After is set of functions that executes AFTER Exec -type After func(req internal.Payload, resp internal.Payload) internal.Payload +type After func(req payload.Payload, resp payload.Payload) payload.Payload type Options func(p *StaticPool) @@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg: cfg, cmd: cmd, factory: factory, - events: events2.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), after: make([]After, 0, 0), before: make([]Before, 0, 0), } @@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return sp.ww.RemoveWorker(wb) } -func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { return rsp, nil } -func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("exec with context") ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (internal.Payload, error) { + return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.ErrSoftJob, err) { @@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.ww.PushWorker(w) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } w.State().Set(internal.StateInvalid) @@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { errS := w.Stop(bCtx) if errS != nil { - return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } @@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc } } -func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } r, err := sw.(worker.SyncWorker).Exec(p) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index dd33a1a6..b96e9214 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -14,6 +14,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) @@ -80,7 +81,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -104,7 +105,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -128,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -189,7 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } }) - res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) @@ -212,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -287,11 +288,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -323,14 +324,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -363,14 +364,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -400,7 +401,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(internal.Payload{Body: []byte("100")}) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -422,7 +423,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(internal.Payload{Body: []byte("100")}) + _, err := p.Exec(payload.Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -430,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.Exec(internal.Payload{Body: []byte("100")}) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -456,7 +457,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(internal.StateErrored) } - _, err = p.Exec(internal.Payload{Body: []byte("hello")}) + _, err = p.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -494,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -520,7 +521,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -549,7 +550,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 6d1f0c58..6faa609c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -11,6 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) const MB = 1024 * 1024 @@ -42,10 +43,10 @@ func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig type ttlExec struct { err error - p internal.Payload + p payload.Payload } -func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { +func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("exec_supervised") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -59,7 +60,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) if err != nil { c <- ttlExec{ err: errors.E(op, err), - p: internal.Payload{}, + p: payload.Payload{}, } } @@ -72,10 +73,10 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) for { select { case <-ctx.Done(): - return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) case res := <-c: if res.err != nil { - return internal.Payload{}, res.err + return payload.Payload{}, res.err } return res.p, nil @@ -83,11 +84,11 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) } } -func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { +func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("supervised exec") rsp, err := sp.pool.Exec(p) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } return rsp, nil } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 2e3e7fd2..7dd423b8 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) @@ -61,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) - _, err = p.Exec(internal.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -98,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), internal.Payload{ + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -139,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(internal.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go index f1a7d637..6a88713a 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1eb1396e..daa07186 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" "github.com/spiral/goridge/v3" @@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) { } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } if tw.w.State().Value() != internal.StateReady { - return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) } // set last used time @@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } - return internal.Payload{}, err + return payload.Payload{}, err } tw.w.State().Set(internal.StateReady) @@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { } type wexec struct { - payload internal.Payload + payload payload.Payload err error } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("ExecWithContext") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Str("payload can not be empty")), } return @@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( if tw.w.State().Value() != internal.StateReady { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), } return @@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( tw.w.State().RegisterExec() } c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, err), } return @@ -113,18 +114,18 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { - return internal.Payload{}, multierr.Append(err, ctx.Err()) + return payload.Payload{}, multierr.Append(err, ctx.Err()) } - return internal.Payload{}, ctx.Err() + return payload.Payload{}, ctx.Err() case res := <-c: if res.err != nil { - return internal.Payload{}, res.err + return payload.Payload{}, res.err } return res.payload, nil } } -func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { const op = errors.Op("exec payload") frame := goridge.NewFrame() @@ -147,35 +148,35 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) err := tw.Relay().Send(frame) if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } frameR := goridge.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return internal.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil frame received")) } if !frameR.VerifyCRC() { - return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() if flags&byte(goridge.ERROR) != byte(0) { - return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := internal.Payload{} + payload := payload.Payload{} payload.Context = frameR.Payload()[:options[0]] payload.Body = frameR.Payload()[options[0]:] diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index e224e105..40988b06 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -4,7 +4,7 @@ import ( "os/exec" "testing" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/stretchr/testify/assert" ) @@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 35d3264e..95fa6e06 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -17,7 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - events2 "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -88,7 +88,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } w := &Process{ created: time.Now(), - events: events2.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), diff --git a/plugins/http/request.go b/plugins/http/request.go index 5df79b7d..d613bcf6 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -10,7 +10,7 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/plugins/http/attributes" ) @@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (internal.Payload, error) { - p := internal.Payload{} +func (r *Request) Payload() (payload.Payload, error) { + p := payload.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index 9700a16c..17049ce1 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Response handles PSR7 response logic. @@ -23,7 +23,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p internal.Payload) (*Response, error) { +func NewResponse(p payload.Payload) (*Response, error) { r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { return nil, err diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go index a526fe03..7901a0d1 100644 --- a/plugins/http/tests/response_test.go +++ b/plugins/http/tests/response_test.go @@ -6,7 +6,7 @@ import ( "net/http" "testing" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" http2 "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)}) + r, err := http2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index 61c9a8f9..9a8a630c 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -47,7 +47,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 3b97efff..b1545718 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -6,7 +6,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" @@ -31,7 +31,7 @@ func (f *Foo2) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 2857dadc..da92288a 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -6,7 +6,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" @@ -31,7 +31,7 @@ func (f *Foo3) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } |